diff --git a/.github/workflows/knative-go-test.yaml b/.github/workflows/knative-go-test.yaml index 75d921e216e..aa3ec66c5d8 100644 --- a/.github/workflows/knative-go-test.yaml +++ b/.github/workflows/knative-go-test.yaml @@ -15,3 +15,5 @@ on: jobs: test: uses: knative/actions/.github/workflows/reusable-go-test.yaml@main + secrets: + CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} diff --git a/config/brokers/mt-channel-broker/roles/controller-clusterrole.yaml b/config/brokers/mt-channel-broker/roles/controller-clusterrole.yaml index a3db7083f43..db89981a354 100644 --- a/config/brokers/mt-channel-broker/roles/controller-clusterrole.yaml +++ b/config/brokers/mt-channel-broker/roles/controller-clusterrole.yaml @@ -38,3 +38,9 @@ rules: - "delete" - "patch" - "watch" + - apiGroups: + - eventing.knative.dev + resources: + - brokers + verbs: + - "knsubscribe" diff --git a/config/channels/in-memory-channel/roles/controller-clusterrole.yaml b/config/channels/in-memory-channel/roles/controller-clusterrole.yaml index c4a9d4d399f..9e1ab6812b4 100644 --- a/config/channels/in-memory-channel/roles/controller-clusterrole.yaml +++ b/config/channels/in-memory-channel/roles/controller-clusterrole.yaml @@ -30,6 +30,7 @@ rules: - list - watch - update + - knsubscribe - apiGroups: - messaging.knative.dev resources: diff --git a/config/core/configmaps/features.yaml b/config/core/configmaps/features.yaml index 68f6ca282d7..fe1ed2aa5f1 100644 --- a/config/core/configmaps/features.yaml +++ b/config/core/configmaps/features.yaml @@ -44,7 +44,7 @@ data: # For more details: https://github.com/knative/eventing/issues/5204 new-trigger-filters: "enabled" - # ALPHA feature: The transport-encryption flag allows you to encrypt events in transit using the transport layer security (TLS) protocol. + # BETA feature: The transport-encryption flag allows you to encrypt events in transit using the transport layer security (TLS) protocol. # For more details: https://github.com/knative/eventing/issues/5957 transport-encryption: "disabled" diff --git a/go.mod b/go.mod index 6616686679b..50ad6288d66 100644 --- a/go.mod +++ b/go.mod @@ -49,8 +49,8 @@ require ( k8s.io/utils v0.0.0-20240102154912-e7106e64919e knative.dev/hack v0.0.0-20240507013718-68e3bfb39d11 knative.dev/hack/schema v0.0.0-20240507013718-68e3bfb39d11 - knative.dev/pkg v0.0.0-20240513091600-b1fd04d5c458 - knative.dev/reconciler-test v0.0.0-20240507120221-c76096ce6188 + knative.dev/pkg v0.0.0-20240515073057-11a3d46fe4d6 + knative.dev/reconciler-test v0.0.0-20240515112255-ba9aeb3a5b91 sigs.k8s.io/yaml v1.4.0 ) diff --git a/go.sum b/go.sum index 79a1f9bb00a..c51c1a51460 100644 --- a/go.sum +++ b/go.sum @@ -892,10 +892,10 @@ knative.dev/hack v0.0.0-20240507013718-68e3bfb39d11 h1:CYoD72R8/R35REjeY2nnWfBak knative.dev/hack v0.0.0-20240507013718-68e3bfb39d11/go.mod h1:yk2OjGDsbEnQjfxdm0/HJKS2WqTLEFg/N6nUs6Rqx3Q= knative.dev/hack/schema v0.0.0-20240507013718-68e3bfb39d11 h1:QlqQMJijcdrY5uN6auYRNaZaR9YiukcZ7VQD2SE+a58= knative.dev/hack/schema v0.0.0-20240507013718-68e3bfb39d11/go.mod h1:3pWwBLnTZSM9psSgCAvhKOHIPTzqfEMlWRpDu6IYhK0= -knative.dev/pkg v0.0.0-20240513091600-b1fd04d5c458 h1:ESofRToj3xFQfKd5rlwd3EHd7G/CbVpchrUsw1HzI1w= -knative.dev/pkg v0.0.0-20240513091600-b1fd04d5c458/go.mod h1:fkgcK/71v1QSJza7pCOxtuk7zSsWYPQ7eiuX8M2wXxs= -knative.dev/reconciler-test v0.0.0-20240507120221-c76096ce6188 h1:uOzt7ZVFpHoMSfjut8H9d1pafNPF2Luat/w5QMV+CIY= -knative.dev/reconciler-test v0.0.0-20240507120221-c76096ce6188/go.mod h1:kZEZ0/oQWnS1wBUgQqer/N9k6IzI4jwLLY2xCblEit4= +knative.dev/pkg v0.0.0-20240515073057-11a3d46fe4d6 h1:mUZ3ZrZFIfHtaILKPodBX1WnFQVpVSdA+e0DaUqIe30= +knative.dev/pkg v0.0.0-20240515073057-11a3d46fe4d6/go.mod h1:fkgcK/71v1QSJza7pCOxtuk7zSsWYPQ7eiuX8M2wXxs= +knative.dev/reconciler-test v0.0.0-20240515112255-ba9aeb3a5b91 h1:sHh5uGMQg5+yVt6AkTSbbrqeESNFMG6xP8y93cfQXhk= +knative.dev/reconciler-test v0.0.0-20240515112255-ba9aeb3a5b91/go.mod h1:GY8Lr7PMvIf1kb+PKTEQdpVPynMdU3xfIWY7nqeLvb4= pgregory.net/rapid v1.1.0 h1:CMa0sjHSru3puNx+J0MIAuiiEV4N0qj8/cMWGBBCsjw= pgregory.net/rapid v1.1.0/go.mod h1:PY5XlDGj0+V1FCq0o192FdRhpKHGTRIWBgqjDBTrq04= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= diff --git a/pkg/apis/eventing/register.go b/pkg/apis/eventing/register.go index bc599aaac1c..ae699f239c9 100644 --- a/pkg/apis/eventing/register.go +++ b/pkg/apis/eventing/register.go @@ -84,6 +84,11 @@ const ( // annotation key used to specify the name of the channel for // the triggers to subscribe to. BrokerChannelNameStatusAnnotationKey = "knative.dev/channelName" + + // BrokerChannelNamespaceStatusAnnotationKey is the broker status + // annotation key used to specify the namespace of the channel for + // the triggers to subscribe to. + BrokerChannelNamespaceStatusAnnotationKey = "knative.dev/channelNamespace" ) var ( diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index 6ded61139e5..5276b6f0a5e 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -227,7 +227,16 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { } func (h *Handler) handleDispatchToReplyRequest(ctx context.Context, trigger *eventingv1.Trigger, writer http.ResponseWriter, request *http.Request, event *event.Event) { - broker, err := h.brokerLister.Brokers(trigger.Namespace).Get(trigger.Spec.Broker) + var brokerRef, brokerNamespace string + if feature.FromContext(ctx).IsEnabled(feature.CrossNamespaceEventLinks) && trigger.Spec.BrokerRef.Namespace != "" { + brokerRef = trigger.Spec.BrokerRef.Name + brokerNamespace = trigger.Spec.BrokerRef.Namespace + } else { + brokerRef = trigger.Spec.Broker + brokerNamespace = trigger.Namespace + } + + broker, err := h.brokerLister.Brokers(brokerNamespace).Get(brokerRef) if err != nil { h.logger.Info("Unable to get the Broker", zap.Error(err)) writer.WriteHeader(http.StatusBadRequest) @@ -239,7 +248,7 @@ func (h *Handler) handleDispatchToReplyRequest(ctx context.Context, trigger *eve reportArgs := &ReportArgs{ ns: trigger.Namespace, trigger: trigger.Name, - broker: trigger.Spec.Broker, + broker: brokerRef, requestType: "reply_forward", } @@ -256,7 +265,16 @@ func (h *Handler) handleDispatchToReplyRequest(ctx context.Context, trigger *eve } func (h *Handler) handleDispatchToDLSRequest(ctx context.Context, trigger *eventingv1.Trigger, writer http.ResponseWriter, request *http.Request, event *event.Event) { - broker, err := h.brokerLister.Brokers(trigger.Namespace).Get(trigger.Spec.Broker) + var brokerRef, brokerNamespace string + if feature.FromContext(ctx).IsEnabled(feature.CrossNamespaceEventLinks) && trigger.Spec.BrokerRef.Namespace != "" { + brokerRef = trigger.Spec.BrokerRef.Name + brokerNamespace = trigger.Spec.BrokerRef.Namespace + } else { + brokerRef = trigger.Spec.Broker + brokerNamespace = trigger.Namespace + } + + broker, err := h.brokerLister.Brokers(brokerNamespace).Get(brokerRef) if err != nil { h.logger.Info("Unable to get the Broker", zap.Error(err)) writer.WriteHeader(http.StatusBadRequest) @@ -298,6 +316,13 @@ func (h *Handler) handleDispatchToDLSRequest(ctx context.Context, trigger *event } func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger *eventingv1.Trigger, writer http.ResponseWriter, request *http.Request, event *event.Event) { + var brokerRef string + if feature.FromContext(ctx).IsEnabled(feature.CrossNamespaceEventLinks) && trigger.Spec.BrokerRef.Namespace != "" { + brokerRef = trigger.Spec.BrokerRef.Name + } else { + brokerRef = trigger.Spec.Broker + } + triggerRef := types.NamespacedName{ Name: trigger.Name, Namespace: trigger.Namespace, @@ -321,7 +346,7 @@ func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger reportArgs := &ReportArgs{ ns: trigger.Namespace, trigger: trigger.Name, - broker: trigger.Spec.Broker, + broker: brokerRef, filterType: triggerFilterAttribute(trigger.Spec.Filter, "type"), requestType: "filter", } diff --git a/pkg/crossnamespace/validation.go b/pkg/crossnamespace/validation.go index 9a0d173bc97..c7be092e18b 100644 --- a/pkg/crossnamespace/validation.go +++ b/pkg/crossnamespace/validation.go @@ -49,6 +49,9 @@ func CheckNamespace(ctx context.Context, r ResourceInfo) *apis.FieldError { return nil } + // convert the kind (Broker or Channel) into a resource (brokers or channels) + targetResource := strings.ToLower(targetKind) + "s" + // GetUserInfo accesses the UserInfo attached to the webhook context. userInfo := apis.GetUserInfo(ctx) if userInfo == nil { @@ -66,7 +69,7 @@ func CheckNamespace(ctx context.Context, r ResourceInfo) *apis.FieldError { Namespace: targetNamespace, Verb: "knsubscribe", Group: targetGroup, - Resource: targetKind, + Resource: targetResource, } // Create the SubjectAccessReview diff --git a/pkg/eventingtls/trust_bundle.go b/pkg/eventingtls/trust_bundle.go index 17e2565367b..a5c82b7139b 100644 --- a/pkg/eventingtls/trust_bundle.go +++ b/pkg/eventingtls/trust_bundle.go @@ -79,10 +79,10 @@ func PropagateTrustBundles(ctx context.Context, k8s kubernetes.Interface, trustB for _, cm := range systemNamespaceBundles { name := userCMName(cm.Name) if p, ok := state[name]; !ok { - state[name] = Pair{sysCM: cm} + state[name] = Pair{sysCM: cm.DeepCopy()} } else { state[name] = Pair{ - sysCM: cm, + sysCM: cm.DeepCopy(), userCm: p.userCm, } } @@ -90,11 +90,11 @@ func PropagateTrustBundles(ctx context.Context, k8s kubernetes.Interface, trustB for _, cm := range userNamespaceBundles { if p, ok := state[cm.Name]; !ok { - state[cm.Name] = Pair{userCm: cm} + state[cm.Name] = Pair{userCm: cm.DeepCopy()} } else { state[cm.Name] = Pair{ sysCM: p.sysCM, - userCm: cm, + userCm: cm.DeepCopy(), } } } @@ -107,26 +107,26 @@ func PropagateTrustBundles(ctx context.Context, k8s kubernetes.Interface, trustB APIVersion: gvk.GroupVersion().String(), Kind: gvk.Kind, Name: obj.GetName(), + UID: obj.GetUID(), } for _, or := range p.userCm.OwnerReferences { + // Only delete the ConfigMap if the object owns it if equality.Semantic.DeepDerivative(expectedOr, or) { if err := deleteConfigMap(ctx, k8s, obj, p.userCm); err != nil { return err } } } - continue } expected := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ - Name: userCMName(p.sysCM.Name), - Namespace: obj.GetNamespace(), - Labels: map[string]string{ - TrustBundleLabelKey: TrustBundleLabelValue, - }, + Name: userCMName(p.sysCM.Name), + Namespace: obj.GetNamespace(), + Labels: p.sysCM.Labels, + Annotations: p.sysCM.Annotations, }, Data: p.sysCM.Data, BinaryData: p.sysCM.BinaryData, @@ -135,32 +135,14 @@ func PropagateTrustBundles(ctx context.Context, k8s kubernetes.Interface, trustB if p.userCm == nil { // Update owner references expected.OwnerReferences = withOwnerReferences(obj, gvk, []metav1.OwnerReference{}) - err := createConfigMap(ctx, k8s, expected) - if err != nil && !apierrors.IsAlreadyExists(err) { + if err := createConfigMap(ctx, k8s, expected); err != nil { return err } - if apierrors.IsAlreadyExists(err) { - // Update existing ConfigMap - cm, getErr := k8s.CoreV1(). - ConfigMaps(obj.GetNamespace()). - Get(ctx, userCMName(p.sysCM.Name), metav1.GetOptions{}) - if getErr != nil { - return err // return original error - } - - cm.ObjectMeta.DeepCopyInto(&expected.ObjectMeta) - expected.OwnerReferences = withOwnerReferences(obj, gvk, cm.OwnerReferences) - - if !equality.Semantic.DeepDerivative(expected, cm) { - if err := updateConfigMap(ctx, k8s, expected); err != nil { - return err - } - } - } continue } - p.userCm.ObjectMeta.DeepCopyInto(&expected.ObjectMeta) + expected.Generation = p.userCm.Generation + expected.ResourceVersion = p.userCm.ResourceVersion // Update owner references expected.OwnerReferences = withOwnerReferences(obj, gvk, p.userCm.OwnerReferences) @@ -277,26 +259,15 @@ func withOwnerReferences(sb kmeta.Accessor, gvk schema.GroupVersionKind, referen } func deleteConfigMap(ctx context.Context, k8s kubernetes.Interface, sb kmeta.Accessor, cm *corev1.ConfigMap) error { - expectedOr := metav1.OwnerReference{ - APIVersion: sb.GroupVersionKind().GroupVersion().String(), - Kind: sb.GroupVersionKind().Kind, - Name: sb.GetName(), - } - // Only delete the ConfigMap if the object owns it - for _, or := range cm.OwnerReferences { - if equality.Semantic.DeepDerivative(expectedOr, or) { - err := k8s.CoreV1().ConfigMaps(sb.GetNamespace()).Delete(ctx, cm.Name, metav1.DeleteOptions{ - TypeMeta: metav1.TypeMeta{}, - Preconditions: &metav1.Preconditions{ - UID: &cm.UID, - }, - }) - if err != nil && !apierrors.IsNotFound(err) { - return fmt.Errorf("failed to delete ConfigMap %s/%s: %w", cm.Namespace, cm.Name, err) - } - - return nil - } + err := k8s.CoreV1().ConfigMaps(sb.GetNamespace()).Delete(ctx, cm.Name, metav1.DeleteOptions{ + TypeMeta: metav1.TypeMeta{}, + Preconditions: &metav1.Preconditions{ + UID: &cm.UID, + ResourceVersion: &cm.ResourceVersion, + }, + }) + if err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("failed to delete ConfigMap %s/%s: %w", cm.Namespace, cm.Name, err) } return nil diff --git a/pkg/graph/constructor.go b/pkg/graph/constructor.go index 96c7c877bd0..c1019a25bfe 100644 --- a/pkg/graph/constructor.go +++ b/pkg/graph/constructor.go @@ -118,19 +118,6 @@ func (g *Graph) AddSource(source duckv1.Source) { v.AddEdge(to, dest, CloudEventOverridesTransform{Overrides: source.Spec.CloudEventOverrides}, true) } -func (g *Graph) getOrCreateVertex(dest *duckv1.Destination) *Vertex { - v, ok := g.vertices[makeComparableDestination(dest)] - if !ok { - v = &Vertex{ - self: dest, - parent: g, - } - g.vertices[makeComparableDestination(dest)] = v - } - - return v -} - func (g *Graph) AddTrigger(trigger eventingv1.Trigger) error { brokerRef := &duckv1.KReference{ Name: trigger.Spec.Broker, @@ -155,7 +142,7 @@ func (g *Graph) AddTrigger(trigger eventingv1.Trigger) error { to := g.getOrCreateVertex(&trigger.Spec.Subscriber) //TODO: the transform function should be set according to the trigger filter - there are multiple open issues to address this later - broker.AddEdge(to, triggerDest, NoTransform{}, false) + broker.AddEdge(to, triggerDest, getTransformForTrigger(trigger), false) if trigger.Spec.Delivery == nil || trigger.Spec.Delivery.DeadLetterSink == nil { return nil @@ -209,3 +196,24 @@ func (g *Graph) AddSubscription(subscription messagingv1.Subscription) error { return nil } + +func getTransformForTrigger(trigger eventingv1.Trigger) Transform { + if len(trigger.Spec.Filters) == 0 && trigger.Spec.Filter != nil { + return &AttributesFilterTransform{Filter: trigger.Spec.Filter} + } + + return NoTransform{} +} + +func (g *Graph) getOrCreateVertex(dest *duckv1.Destination) *Vertex { + v, ok := g.vertices[makeComparableDestination(dest)] + if !ok { + v = &Vertex{ + self: dest, + parent: g, + } + g.vertices[makeComparableDestination(dest)] = v + } + + return v +} diff --git a/pkg/graph/transforms.go b/pkg/graph/transforms.go index 26660403eb9..276ba0096a4 100644 --- a/pkg/graph/transforms.go +++ b/pkg/graph/transforms.go @@ -17,6 +17,11 @@ limitations under the License. package graph import ( + "fmt" + "regexp" + "strings" + + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" eventingv1beta3 "knative.dev/eventing/pkg/apis/eventing/v1beta3" duckv1 "knative.dev/pkg/apis/duck/v1" ) @@ -33,6 +38,82 @@ func (nt NoTransform) Name() string { return "no-transform" } +type AttributesFilterTransform struct { + Filter *eventingv1.TriggerFilter +} + +var _ Transform = &AttributesFilterTransform{} + +func (aft *AttributesFilterTransform) Apply(et *eventingv1beta3.EventType, tfc TransformFunctionContext) (*eventingv1beta3.EventType, TransformFunctionContext) { + etAttributes := make(map[string]*eventingv1beta3.EventAttributeDefinition) + for i := range et.Spec.Attributes { + etAttributes[et.Spec.Attributes[i].Name] = &et.Spec.Attributes[i] + } + + for k, v := range aft.Filter.Attributes { + if attribute, ok := etAttributes[k]; ok { + if attribute.Value != v { + regexp, err := buildRegexForAttribute(attribute.Value) + if err != nil { + return nil, tfc + } + + if regexp.MatchString(v) { + attribute.Value = v + } else { + return nil, tfc + } + } + } else { + etAttributes[k] = &eventingv1beta3.EventAttributeDefinition{ + Name: k, + Value: v, + Required: true, + } + } + } + + updatedAttributes := make([]eventingv1beta3.EventAttributeDefinition, 0, len(et.Spec.Attributes)) + for _, v := range etAttributes { + updatedAttributes = append(updatedAttributes, *v) + } + + et.Spec.Attributes = updatedAttributes + + return et, tfc +} + +func (aft *AttributesFilterTransform) Name() string { + return "attributes-filter" +} + +func buildRegexForAttribute(attribute string) (*regexp.Regexp, error) { + chunks := []string{"^"} + + var chunk strings.Builder + for i := 0; i < len(attribute); { + if attribute[i] == '{' { + chunks = append(chunks, chunk.String(), "[a-zA-Z]+") + chunk.Reset() + + offset := strings.Index(attribute[i:], "}") + if offset == -1 { + return nil, fmt.Errorf("no closing bracket for variable") + } + + i += offset + 1 + continue + } + + chunk.WriteByte(attribute[i]) + i++ + } + + chunks = append(chunks, chunk.String(), "$") + + return regexp.Compile(strings.Join(chunks, "")) +} + type EventTypeTransform struct { EventType *eventingv1beta3.EventType } diff --git a/pkg/graph/transforms_test.go b/pkg/graph/transforms_test.go new file mode 100644 index 00000000000..6580a9c8452 --- /dev/null +++ b/pkg/graph/transforms_test.go @@ -0,0 +1,192 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package graph + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + eventingv1beta3 "knative.dev/eventing/pkg/apis/eventing/v1beta3" +) + +func TestAttributeFilterTransform(t *testing.T) { + tests := []struct { + name string + input *eventingv1beta3.EventType + expected *eventingv1beta3.EventType + filterAttributes eventingv1.TriggerFilterAttributes + }{ + { + name: "one attribute, none set before", + input: &eventingv1beta3.EventType{ + Spec: eventingv1beta3.EventTypeSpec{ + Attributes: make([]eventingv1beta3.EventAttributeDefinition, 0), + }, + }, + expected: &eventingv1beta3.EventType{ + Spec: eventingv1beta3.EventTypeSpec{ + Attributes: []eventingv1beta3.EventAttributeDefinition{ + { + Name: "type", + Value: "example.event.type", + Required: true, + }, + }, + }, + }, + filterAttributes: eventingv1.TriggerFilterAttributes{ + "type": "example.event.type", + }, + }, + { + name: "two attributes, one set before", + input: &eventingv1beta3.EventType{ + Spec: eventingv1beta3.EventTypeSpec{ + Attributes: []eventingv1beta3.EventAttributeDefinition{ + { + Name: "type", + Value: "example.event.type", + Required: true, + }, + }, + }, + }, + expected: &eventingv1beta3.EventType{ + Spec: eventingv1beta3.EventTypeSpec{ + Attributes: []eventingv1beta3.EventAttributeDefinition{ + { + Name: "type", + Value: "example.event.type", + Required: true, + }, + { + Name: "source", + Value: "/sample/source", + Required: true, + }, + }, + }, + }, + filterAttributes: eventingv1.TriggerFilterAttributes{ + "type": "example.event.type", + "source": "/sample/source", + }, + }, + { + name: "one attribute, not compatible with filter", + input: &eventingv1beta3.EventType{ + Spec: eventingv1beta3.EventTypeSpec{ + Attributes: []eventingv1beta3.EventAttributeDefinition{ + { + Name: "type", + Value: "example.event.type", + Required: true, + }, + }, + }, + }, + expected: nil, + filterAttributes: eventingv1.TriggerFilterAttributes{ + "type": "sample.event.type", + }, + }, + { + name: "one attribute, with variable", + input: &eventingv1beta3.EventType{ + Spec: eventingv1beta3.EventTypeSpec{ + Attributes: []eventingv1beta3.EventAttributeDefinition{ + { + Name: "type", + Value: "example.{variable}.type", + Required: true, + }, + }, + }, + }, + expected: &eventingv1beta3.EventType{ + Spec: eventingv1beta3.EventTypeSpec{ + Attributes: []eventingv1beta3.EventAttributeDefinition{ + { + Name: "type", + Value: "example.event.type", + Required: true, + }, + }, + }, + }, + filterAttributes: eventingv1.TriggerFilterAttributes{ + "type": "example.event.type", + }, + }, + { + name: "one attribute, with variable at end", + input: &eventingv1beta3.EventType{ + Spec: eventingv1beta3.EventTypeSpec{ + Attributes: []eventingv1beta3.EventAttributeDefinition{ + { + Name: "type", + Value: "example.event.{variable}", + Required: true, + }, + }, + }, + }, + expected: &eventingv1beta3.EventType{ + Spec: eventingv1beta3.EventTypeSpec{ + Attributes: []eventingv1beta3.EventAttributeDefinition{ + { + Name: "type", + Value: "example.event.type", + Required: true, + }, + }, + }, + }, + filterAttributes: eventingv1.TriggerFilterAttributes{ + "type": "example.event.type", + }, + }, + { + name: "one attribute, with invalid variable (no closing bracket)", + input: &eventingv1beta3.EventType{ + Spec: eventingv1beta3.EventTypeSpec{ + Attributes: []eventingv1beta3.EventAttributeDefinition{ + { + Name: "type", + Value: "example.{variable.type", + Required: true, + }, + }, + }, + }, + expected: nil, + filterAttributes: eventingv1.TriggerFilterAttributes{ + "type": "example.event.type", + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + transform := AttributesFilterTransform{Filter: &eventingv1.TriggerFilter{Attributes: test.filterAttributes}} + out, _ := transform.Apply(test.input, TransformFunctionContext{}) + assert.Equal(t, test.expected, out) + }) + } +} diff --git a/pkg/reconciler/apiserversource/apiserversource_test.go b/pkg/reconciler/apiserversource/apiserversource_test.go index ffa99ec3d12..e48c716c793 100644 --- a/pkg/reconciler/apiserversource/apiserversource_test.go +++ b/pkg/reconciler/apiserversource/apiserversource_test.go @@ -21,6 +21,7 @@ import ( "fmt" "testing" + "k8s.io/apimachinery/pkg/runtime/schema" "knative.dev/pkg/kmeta" "knative.dev/pkg/system" @@ -250,6 +251,7 @@ func TestReconcile(t *testing.T) { rttestingv1.WithConfigMapLabels(metav1.LabelSelector{ MatchLabels: map[string]string{ eventingtls.TrustBundleLabelKey: eventingtls.TrustBundleLabelValue, + "x": "y", }, }), ), @@ -262,6 +264,87 @@ func TestReconcile(t *testing.T) { }, WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(true)}, SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped. + }, { + Name: "trust bundle propagation - delete config map", + Objects: []runtime.Object{ + rttestingv1.NewApiServerSource(sourceName, testNS, + rttestingv1.WithApiServerSourceSpec(sourcesv1.ApiServerSourceSpec{ + Resources: []sourcesv1.APIVersionKindSelector{{ + APIVersion: "v1", + Kind: "Namespace", + }}, + SourceSpec: duckv1.SourceSpec{Sink: sinkDest}, + }), + rttestingv1.WithApiServerSourceUID(sourceUID), + rttestingv1.WithApiServerSourceObjectMetaGeneration(generation), + ), + rttestingv1.NewChannel(sinkName, testNS, + rttestingv1.WithInitChannelConditions, + rttestingv1.WithChannelAddress(sinkAddressable), + ), + makeAvailableReceiveAdapter(t, withTrustBundle("bundle")), + rttestingv1.NewConfigMap("bundle"+eventingtls.TrustBundleConfigMapNameSuffix, testNS, + rttestingv1.WithConfigMapData(map[string]string{"a": "a"}), + func(cm *corev1.ConfigMap) { + cm.OwnerReferences = append(cm.OwnerReferences, metav1.OwnerReference{ + APIVersion: "sources.knative.dev/v1", + Kind: "ApiServerSource", + Name: sourceName, + UID: sourceUID, + }) + }, + rttestingv1.WithConfigMapLabels(metav1.LabelSelector{ + MatchLabels: map[string]string{ + eventingtls.TrustBundleLabelKey: eventingtls.TrustBundleLabelValue, + }, + }), + ), + }, + Key: testNS + "/" + sourceName, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: rttestingv1.NewApiServerSource(sourceName, testNS, + rttestingv1.WithApiServerSourceSpec(sourcesv1.ApiServerSourceSpec{ + Resources: []sourcesv1.APIVersionKindSelector{{ + APIVersion: "v1", + Kind: "Namespace", + }}, + SourceSpec: duckv1.SourceSpec{Sink: sinkDest}, + }), + rttestingv1.WithApiServerSourceUID(sourceUID), + rttestingv1.WithApiServerSourceObjectMetaGeneration(generation), + // Status Update: + rttestingv1.WithInitApiServerSourceConditions, + rttestingv1.WithApiServerSourceDeployed, + rttestingv1.WithApiServerSourceSink(sinkURI), + rttestingv1.WithApiServerSourceSufficientPermissions, + rttestingv1.WithApiServerSourceReferenceModeEventTypes(source), + rttestingv1.WithApiServerSourceStatusObservedGeneration(generation), + rttestingv1.WithApiServerSourceStatusNamespaces([]string{testNS}), + rttestingv1.WithApiServerSourceOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), + ), + }}, + WantCreates: []runtime.Object{ + makeSubjectAccessReview("namespaces", "get", "default"), + makeSubjectAccessReview("namespaces", "list", "default"), + makeSubjectAccessReview("namespaces", "watch", "default"), + }, + WantDeletes: []clientgotesting.DeleteActionImpl{ + { + ActionImpl: clientgotesting.ActionImpl{ + Resource: schema.GroupVersionResource{Group: "", Version: "v1", Resource: "configmaps"}, + Namespace: system.Namespace(), + }, + Name: "bundle" + eventingtls.TrustBundleConfigMapNameSuffix, + }, + }, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), + }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(sourceName, testNS), + }, + WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(true)}, + SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped. }, { Name: "trust bundle propagation - update RA", Objects: []runtime.Object{ @@ -303,6 +386,7 @@ func TestReconcile(t *testing.T) { rttestingv1.WithConfigMapLabels(metav1.LabelSelector{ MatchLabels: map[string]string{ eventingtls.TrustBundleLabelKey: eventingtls.TrustBundleLabelValue, + "x": "y", }, }), ), @@ -331,32 +415,7 @@ func TestReconcile(t *testing.T) { ), }}, WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: makeAvailableReceiveAdapter(t, func(deployment *appsv1.Deployment) { - - volumeName := fmt.Sprintf("%s%s", eventingtls.TrustBundleVolumeNamePrefix, "volume") - deployment.Spec.Template.Spec.Volumes = append(deployment.Spec.Template.Spec.Volumes, corev1.Volume{ - Name: volumeName, - VolumeSource: corev1.VolumeSource{ - Projected: &corev1.ProjectedVolumeSource{ - Sources: []corev1.VolumeProjection{ - { - ConfigMap: &corev1.ConfigMapProjection{ - LocalObjectReference: corev1.LocalObjectReference{ - Name: "bundle" + eventingtls.TrustBundleConfigMapNameSuffix, - }, - }, - }, - }, - }, - }, - }) - - deployment.Spec.Template.Spec.Containers[0].VolumeMounts = append(deployment.Spec.Template.Spec.Containers[0].VolumeMounts, corev1.VolumeMount{ - Name: volumeName, - ReadOnly: true, - MountPath: eventingtls.TrustBundleMountPath, - }) - }), + Object: makeAvailableReceiveAdapter(t, withTrustBundle("bundle")), }}, WantCreates: []runtime.Object{ makeSubjectAccessReview("namespaces", "get", "default"), @@ -447,6 +506,7 @@ func TestReconcile(t *testing.T) { rttestingv1.WithConfigMapLabels(metav1.LabelSelector{ MatchLabels: map[string]string{ eventingtls.TrustBundleLabelKey: eventingtls.TrustBundleLabelValue, + "x": "y", }, }), ), @@ -1496,6 +1556,35 @@ func TestReconcile(t *testing.T) { )) } +func withTrustBundle(name string) rttestingv1.DeploymentOption { + return func(deployment *appsv1.Deployment) { + + volumeName := fmt.Sprintf("%s%s", eventingtls.TrustBundleVolumeNamePrefix, "volume") + deployment.Spec.Template.Spec.Volumes = append(deployment.Spec.Template.Spec.Volumes, corev1.Volume{ + Name: volumeName, + VolumeSource: corev1.VolumeSource{ + Projected: &corev1.ProjectedVolumeSource{ + Sources: []corev1.VolumeProjection{ + { + ConfigMap: &corev1.ConfigMapProjection{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: name + eventingtls.TrustBundleConfigMapNameSuffix, + }, + }, + }, + }, + }, + }, + }) + + deployment.Spec.Template.Spec.Containers[0].VolumeMounts = append(deployment.Spec.Template.Spec.Containers[0].VolumeMounts, corev1.VolumeMount{ + Name: volumeName, + ReadOnly: true, + MountPath: eventingtls.TrustBundleMountPath, + }) + } +} + func makeReceiveAdapter(t *testing.T, option ...rttestingv1.DeploymentOption) *appsv1.Deployment { return makeReceiveAdapterWithName(t, sourceName, option...) } diff --git a/pkg/reconciler/broker/resources/subscription.go b/pkg/reconciler/broker/resources/subscription.go index 99246aa05a7..7be849f05ad 100644 --- a/pkg/reconciler/broker/resources/subscription.go +++ b/pkg/reconciler/broker/resources/subscription.go @@ -17,37 +17,47 @@ limitations under the License. package resources import ( + "context" "fmt" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/pkg/kmeta" - duckv1 "knative.dev/pkg/apis/duck/v1" - eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" "knative.dev/eventing/pkg/apis/eventing" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + "knative.dev/eventing/pkg/apis/feature" messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" + duckv1 "knative.dev/pkg/apis/duck/v1" ) // NewSubscription returns a placeholder subscription for trigger 't', from brokerTrigger to 'dest' // replying to brokerIngress. -func NewSubscription(t *eventingv1.Trigger, brokerTrigger *corev1.ObjectReference, dest, reply *duckv1.Destination, delivery *eventingduckv1.DeliverySpec) *messagingv1.Subscription { +func NewSubscription(ctx context.Context, t *eventingv1.Trigger, brokerTrigger *corev1.ObjectReference, dest, reply *duckv1.Destination, delivery *eventingduckv1.DeliverySpec) *messagingv1.Subscription { + var broker, channelNamespace string + if t.Spec.BrokerRef != nil && feature.FromContext(ctx).IsEnabled(feature.CrossNamespaceEventLinks) { + broker = t.Spec.BrokerRef.Name + channelNamespace = t.Spec.BrokerRef.Namespace + } else { + broker = t.Spec.Broker + channelNamespace = "" + } return &messagingv1.Subscription{ ObjectMeta: metav1.ObjectMeta{ Namespace: t.Namespace, - Name: kmeta.ChildName(fmt.Sprintf("%s-%s-", t.Spec.Broker, t.Name), string(t.GetUID())), + Name: kmeta.ChildName(fmt.Sprintf("%s-%s-", broker, t.Name), string(t.GetUID())), OwnerReferences: []metav1.OwnerReference{ *kmeta.NewControllerRef(t), }, - Labels: SubscriptionLabels(t), + Labels: SubscriptionLabels(ctx, t), }, Spec: messagingv1.SubscriptionSpec{ Channel: duckv1.KReference{ APIVersion: brokerTrigger.APIVersion, Kind: brokerTrigger.Kind, Name: brokerTrigger.Name, + Namespace: channelNamespace, }, Subscriber: dest, Reply: reply, @@ -58,9 +68,15 @@ func NewSubscription(t *eventingv1.Trigger, brokerTrigger *corev1.ObjectReferenc // SubscriptionLabels generates the labels present on the Subscription linking this Trigger to the // Broker's Channels. -func SubscriptionLabels(t *eventingv1.Trigger) map[string]string { +func SubscriptionLabels(ctx context.Context, t *eventingv1.Trigger) map[string]string { + var broker string + if t.Spec.BrokerRef != nil && feature.FromContext(ctx).IsEnabled(feature.CrossNamespaceEventLinks) { + broker = t.Spec.BrokerRef.Name + } else { + broker = t.Spec.Broker + } return map[string]string{ - eventing.BrokerLabelKey: t.Spec.Broker, + eventing.BrokerLabelKey: broker, "eventing.knative.dev/trigger": t.Name, } } diff --git a/pkg/reconciler/broker/resources/subscription_test.go b/pkg/reconciler/broker/resources/subscription_test.go index 5002f24eb86..bfe9692baee 100644 --- a/pkg/reconciler/broker/resources/subscription_test.go +++ b/pkg/reconciler/broker/resources/subscription_test.go @@ -17,6 +17,7 @@ limitations under the License. package resources import ( + "context" "testing" "github.com/google/go-cmp/cmp" @@ -63,7 +64,7 @@ func TestNewSubscription(t *testing.T) { APIVersion: "broker-apiVersion", }, } - got := NewSubscription(trigger, triggerChannelRef, dest, reply, delivery) + got := NewSubscription(context.Background(), trigger, triggerChannelRef, dest, reply, delivery) want := &messagingv1.Subscription{ ObjectMeta: metav1.ObjectMeta{ Namespace: "t-namespace", diff --git a/pkg/reconciler/broker/trigger/controller.go b/pkg/reconciler/broker/trigger/controller.go index 8e92136d93a..57067e81929 100644 --- a/pkg/reconciler/broker/trigger/controller.go +++ b/pkg/reconciler/broker/trigger/controller.go @@ -85,7 +85,7 @@ func NewController( impl := triggerreconciler.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options { return controller.Options{ ConfigStore: featureStore, - PromoteFilterFunc: filterTriggers(r.brokerLister), + PromoteFilterFunc: filterTriggers(featureStore, r.brokerLister), } }) r.impl = impl @@ -94,7 +94,7 @@ func NewController( r.uriResolver = resolver.NewURIResolverFromTracker(ctx, impl.Tracker) triggerInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ - FilterFunc: filterTriggers(r.brokerLister), + FilterFunc: filterTriggers(featureStore, r.brokerLister), Handler: controller.HandleAll(impl.Enqueue), }) @@ -119,7 +119,7 @@ func NewController( // Reconciler Trigger when the OIDC service account changes oidcServiceaccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ - FilterFunc: filterOIDCServiceAccounts(triggerInformer.Lister(), brokerInformer.Lister()), + FilterFunc: filterOIDCServiceAccounts(featureStore, triggerInformer.Lister(), brokerInformer.Lister()), Handler: controller.HandleAll(impl.EnqueueControllerOf), }) @@ -128,7 +128,7 @@ func NewController( // filterOIDCServiceAccounts returns a function that returns true if the resource passed // is a service account, which is owned by a trigger pointing to a MTChannelBased broker. -func filterOIDCServiceAccounts(triggerLister eventinglisters.TriggerLister, brokerLister eventinglisters.BrokerLister) func(interface{}) bool { +func filterOIDCServiceAccounts(featureStore *feature.Store, triggerLister eventinglisters.TriggerLister, brokerLister eventinglisters.BrokerLister) func(interface{}) bool { return func(obj interface{}) bool { controlledByTrigger := controller.FilterController(&eventing.Trigger{})(obj) if !controlledByTrigger { @@ -150,20 +150,28 @@ func filterOIDCServiceAccounts(triggerLister eventinglisters.TriggerLister, brok return false } - return filterTriggers(brokerLister)(trigger) + return filterTriggers(featureStore, brokerLister)(trigger) } } // filterTriggers returns a function that returns true if the resource passed // is a trigger pointing to a MTChannelBroker. -func filterTriggers(lister eventinglisters.BrokerLister) func(interface{}) bool { +func filterTriggers(featureStore *feature.Store, lister eventinglisters.BrokerLister) func(interface{}) bool { return func(obj interface{}) bool { trigger, ok := obj.(*eventing.Trigger) if !ok { return false } - b, err := lister.Brokers(trigger.Namespace).Get(trigger.Spec.Broker) + if featureStore.IsEnabled(feature.CrossNamespaceEventLinks) && trigger.Spec.BrokerRef != nil { + broker = trigger.Spec.BrokerRef.Name + brokerNamespace = trigger.Spec.BrokerRef.Namespace + } else { + broker = trigger.Spec.Broker + brokerNamespace = trigger.Namespace + } + + b, err := lister.Brokers(brokerNamespace).Get(broker) if err != nil { return false } diff --git a/pkg/reconciler/broker/trigger/controller_test.go b/pkg/reconciler/broker/trigger/controller_test.go index f9f090e5ba6..6db330e995f 100644 --- a/pkg/reconciler/broker/trigger/controller_test.go +++ b/pkg/reconciler/broker/trigger/controller_test.go @@ -21,6 +21,7 @@ import ( "fmt" "testing" + "knative.dev/pkg/logging" "knative.dev/pkg/ptr" triggerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger" @@ -38,6 +39,7 @@ import ( apiseventing "knative.dev/eventing/pkg/apis/eventing" eventing "knative.dev/eventing/pkg/apis/eventing/v1" + "knative.dev/eventing/pkg/apis/feature" brokerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker" v1lister "knative.dev/eventing/pkg/client/listers/eventing/v1" testingv1 "knative.dev/eventing/pkg/reconciler/testing/v1" @@ -192,7 +194,8 @@ func TestFilterOIDCServiceAccounts(t *testing.T) { err := triggerInformer.Informer().GetStore().Add(tc.trigger) assert.NoError(t, err) - filter := filterOIDCServiceAccounts(triggerInformer.Lister(), brokerInformer.Lister()) + featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store")) + filter := filterOIDCServiceAccounts(featureStore, triggerInformer.Lister(), brokerInformer.Lister()) pass := filter(tc.sa) assert.Equal(t, tc.pass, pass) }) @@ -270,7 +273,8 @@ func TestFilterTriggers(t *testing.T) { for _, obj := range tc.brokers { _ = brokerInformer.Informer().GetStore().Add(obj) } - filter := filterTriggers(brokerInformer.Lister()) + featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store")) + filter := filterTriggers(featureStore, brokerInformer.Lister()) pass := filter(tc.trigger) assert.Equal(t, tc.pass, pass) }) diff --git a/pkg/reconciler/broker/trigger/trigger.go b/pkg/reconciler/broker/trigger/trigger.go index 44cef541a1a..8d9578b5ae1 100644 --- a/pkg/reconciler/broker/trigger/trigger.go +++ b/pkg/reconciler/broker/trigger/trigger.go @@ -57,6 +57,8 @@ import ( ) var brokerGVK = eventingv1.SchemeGroupVersion.WithKind("Broker") +var brokerNamespace string +var broker string const ( // Name of the corev1.Events emitted from the Trigger reconciliation process. @@ -89,15 +91,23 @@ type Reconciler struct { func (r *Reconciler) ReconcileKind(ctx context.Context, t *eventingv1.Trigger) pkgreconciler.Event { logging.FromContext(ctx).Infow("Reconciling", zap.Any("Trigger", t)) - b, err := r.brokerLister.Brokers(t.Namespace).Get(t.Spec.Broker) + if t.Spec.BrokerRef != nil && feature.FromContext(ctx).IsEnabled(feature.CrossNamespaceEventLinks) { + broker = t.Spec.BrokerRef.Name + brokerNamespace = t.Spec.BrokerRef.Namespace + } else { + broker = t.Spec.Broker + brokerNamespace = t.Namespace + } + + b, err := r.brokerLister.Brokers(brokerNamespace).Get(broker) if err != nil { if apierrs.IsNotFound(err) { - logging.FromContext(ctx).Errorw(fmt.Sprintf("Trigger %s/%s has no broker %q", t.Namespace, t.Name, t.Spec.Broker)) - t.Status.MarkBrokerFailed("BrokerDoesNotExist", "Broker %q does not exist", t.Spec.Broker) + logging.FromContext(ctx).Errorw(fmt.Sprintf("Trigger %s/%s has no broker %q", t.Namespace, t.Name, broker)) + t.Status.MarkBrokerFailed("BrokerDoesNotExist", "Broker %q does not exist", broker) // Ok to return nil here. Once the Broker comes available, or Trigger changes, we get requeued. return nil } else { - t.Status.MarkBrokerFailed("FailedToGetBroker", "Failed to get broker %q : %s", t.Spec.Broker, err) + t.Status.MarkBrokerFailed("FailedToGetBroker", "Failed to get broker %q : %s", broker, err) return err } } @@ -276,7 +286,7 @@ func (r *Reconciler) subscribeToBrokerChannel(ctx context.Context, b *eventingv1 delivery.DeadLetterSink = dls } - expected = resources.NewSubscription(t, brokerTrigger, dest, reply, delivery) + expected = resources.NewSubscription(ctx, t, brokerTrigger, dest, reply, delivery) } else { // in case OIDC is not enabled, we don't need to route everything throuh // broker-filter because we need it only then to add the token from the @@ -290,7 +300,7 @@ func (r *Reconciler) subscribeToBrokerChannel(ctx context.Context, b *eventingv1 }, } - expected = resources.NewSubscription(t, brokerTrigger, dest, reply, delivery) + expected = resources.NewSubscription(ctx, t, brokerTrigger, dest, reply, delivery) } sub, err := r.subscriptionLister.Subscriptions(t.Namespace).Get(expected.Name) diff --git a/pkg/reconciler/broker/trigger/trigger_test.go b/pkg/reconciler/broker/trigger/trigger_test.go index c7582c7ff44..306f46e04dc 100644 --- a/pkg/reconciler/broker/trigger/trigger_test.go +++ b/pkg/reconciler/broker/trigger/trigger_test.go @@ -22,11 +22,14 @@ import ( "testing" cloudevents "github.com/cloudevents/sdk-go/v2" + authenticationv1 "k8s.io/api/authentication/v1" + authv1 "k8s.io/api/authorization/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/rest" clientgotesting "k8s.io/client-go/testing" "k8s.io/utils/pointer" "knative.dev/pkg/apis" @@ -37,6 +40,7 @@ import ( v1b1addr "knative.dev/pkg/client/injection/ducks/duck/v1beta1/addressable" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" + "knative.dev/pkg/injection" fakedynamicclient "knative.dev/pkg/injection/clients/dynamicclient/fake" logtesting "knative.dev/pkg/logging/testing" "knative.dev/pkg/network" @@ -72,10 +76,14 @@ import ( ) const ( - systemNS = "knative-testing" - testNS = "test-namespace" - brokerName = "test-broker" - dlsName = "test-dls" + systemNS = "knative-testing" + testNS = "test-namespace" + brokerName = "test-broker" + brokerNS = "test-broker-namespace" + brokerGroup = "eventing.knative.dev" + brokerKind = "Broker" + brokerVersion = "v1" + dlsName = "test-dls" configMapName = "test-configmap" @@ -113,6 +121,7 @@ kind: "InMemoryChannel" ) var ( + ctx = context.Background() subscriberURL, _ = apis.ParseURL(subscriberURI) testKey = fmt.Sprintf("%s/%s", testNS, triggerName) @@ -136,6 +145,13 @@ var ( Version: "v1", Kind: "Service", } + + brokerrefGVK = metav1.GroupVersionKind{ + Group: brokerGroup, + Version: brokerVersion, + Kind: brokerKind, + } + brokerDestv1 = duckv1.Destination{ Ref: &duckv1.KReference{ Name: sinkName, @@ -249,6 +265,70 @@ func TestReconcile(t *testing.T) { WithTriggerSubscriberURI(subscriberURI), WithTriggerBrokerFailed("nofilter", "NoFilter")), }, + }, { + Name: "Broker cross-namespace reference", + Key: testKey, + Ctx: settingCtxforCrossNamespaceEventLinks("test-user"), + Objects: []runtime.Object{ + NewBroker(brokerName, brokerNS, + WithBrokerClass(eventing.MTChannelBrokerClassValue), + WithBrokerConfig(config()), + WithInitBrokerConditions, + WithBrokerReady, + WithChannelAddressAnnotation(triggerChannelURL), + WithChannelAPIVersionAnnotation(triggerChannelAPIVersion), + WithChannelKindAnnotation(triggerChannelKind), + WithChannelNameAnnotation(triggerChannelName), + WithChannelNamespaceAnnotation(brokerNS)), + NewTriggerWithBrokerRef(triggerName, testNS, + WithTriggerBrokerRef(brokerrefGVK, brokerName, brokerNS), + WithTriggerUID(triggerUID), + WithTriggerSubscriberRef(subscriberGVK, subscriberName, testNS), + WithInitTriggerConditions, + WithTriggerSubscriberURI(subscriberURI)), + CreateRole("test-role", brokerNS, + WithRoleRules( + WithPolicyRule( + WithAPIGroups([]string{"messaging.knative.dev"}), + WithResources("InMemoryChannels"), + WithVerbs("knsubscribe")), + WithPolicyRule( + WithAPIGroups([]string{"eventing.knative.dev"}), + WithResources("Brokers"), + WithVerbs("knsubscribe")))), + CreateRoleBinding("test-role", brokerNS, + WithRoleBindingSubjects( + WithSubjects( + WithSubjectKind("ServiceAccount"), + WithSubjectName("test-user"))), + WithRoleBindingRoleRef( + WithRoleRef( + WithRoleRefAPIGroup("rbac.authorization.k8s.io"), + WithRoleRefKind("Role"), + WithRoleRefName("test-role")))), + }, + WantErr: false, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewTriggerWithBrokerRef(triggerName, testNS, + WithTriggerBrokerRef(brokerrefGVK, brokerName, brokerNS), + WithTriggerUID(triggerUID), + WithTriggerSubscriberURI(subscriberURI), + WithInitTriggerConditions, + WithTriggerBrokerReady(), + WithTriggerSubscriptionNotConfigured(), + WithTriggerStatusSubscriberURI(subscriberURI), + WithTriggerSubscriberResolvedSucceeded(), + WithTriggerDeadLetterSinkNotConfigured(), + WithTriggerDependencyReady(), + WithTriggerOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), + ), + }}, + WantCreates: []runtime.Object{ + makeFilterSubscriptionWithBrokerRef(brokerNS), + makeSubjectAccessReview("test-user", makeResourceAttributes(brokerNS, triggerChannelName, "knsubscribe", "messaging.knative.dev", "inmemorychannels")), + makeSubjectAccessReview("test-user", makeResourceAttributes(brokerNS, brokerName, "knsubscribe", "eventing.knative.dev", "brokers")), + }, + SkipNamespaceValidation: true, }, { Name: "Creates subscription", Key: testKey, @@ -300,7 +380,7 @@ func TestReconcile(t *testing.T) { WithTriggerRetry(5, nil, nil)), }, WantCreates: []runtime.Object{ - resources.NewSubscription(makeTrigger(testNS), createTriggerChannelRef(), makeServiceURI(), makeBrokerRef(), makeDelivery(nil, ptr.Int32(5), nil, nil)), + resources.NewSubscription(ctx, makeTrigger(testNS), createTriggerChannelRef(), makeServiceURI(), makeBrokerRef(), makeDelivery(nil, ptr.Int32(5), nil, nil)), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewTrigger(triggerName, testNS, brokerName, @@ -334,7 +414,7 @@ func TestReconcile(t *testing.T) { WithTriggerDeadLeaderSink(duckv1.Destination{URI: dlsURL})), }, WantCreates: []runtime.Object{ - resources.NewSubscription(makeTrigger(testNS), createTriggerChannelRef(), makeServiceURI(), makeBrokerRef(), makeDelivery(&duckv1.Destination{URI: dlsURL}, nil, nil, nil)), + resources.NewSubscription(ctx, makeTrigger(testNS), createTriggerChannelRef(), makeServiceURI(), makeBrokerRef(), makeDelivery(&duckv1.Destination{URI: dlsURL}, nil, nil, nil)), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewTrigger(triggerName, testNS, brokerName, @@ -376,6 +456,7 @@ func TestReconcile(t *testing.T) { }, WantCreates: []runtime.Object{ resources.NewSubscription( + ctx, makeTrigger(testNS), createTriggerChannelRef(), makeServiceURI(), @@ -438,6 +519,7 @@ func TestReconcile(t *testing.T) { }, WantCreates: []runtime.Object{ resources.NewSubscription( + ctx, makeTrigger(testNS), createTriggerChannelRef(), makeServiceURIHTTPS(), @@ -500,6 +582,7 @@ func TestReconcile(t *testing.T) { }, WantCreates: []runtime.Object{ resources.NewSubscription( + ctx, makeTrigger(testNS), createTriggerChannelRef(), makeServiceURIHTTPS(), @@ -562,6 +645,7 @@ func TestReconcile(t *testing.T) { }, WantCreates: []runtime.Object{ resources.NewSubscription( + ctx, makeTrigger(testNS), createTriggerChannelRef(), makeServiceURI(), @@ -961,6 +1045,7 @@ func TestReconcile(t *testing.T) { }}, WantCreates: []runtime.Object{ resources.NewSubscription( + ctx, makeTrigger(testNS), createTriggerChannelRef(), makeServiceURI(), @@ -1028,6 +1113,7 @@ func TestReconcile(t *testing.T) { WantErr: false, WantCreates: []runtime.Object{ resources.NewSubscription( + ctx, makeTrigger(testNS), createTriggerChannelRef(), makeServiceURI(), @@ -1090,6 +1176,7 @@ func TestReconcile(t *testing.T) { WantErr: false, WantCreates: []runtime.Object{ resources.NewSubscription( + ctx, makeTrigger(testNS), createTriggerChannelRef(), makeServiceURI(), @@ -1153,6 +1240,7 @@ func TestReconcile(t *testing.T) { WantErr: false, WantCreates: []runtime.Object{ resources.NewSubscription( + ctx, makeTrigger(testNS), createTriggerChannelRef(), makeServiceURI(), @@ -1591,7 +1679,7 @@ func TestReconcile(t *testing.T) { ), }}, WantCreates: []runtime.Object{ - resources.NewSubscription(makeTrigger(testNS), createTriggerChannelRef(), makeServiceURIWithAudience(), makeReplyDestinationViaBrokerFilter(), makeEmptyDelivery()), + resources.NewSubscription(ctx, makeTrigger(testNS), createTriggerChannelRef(), makeServiceURIWithAudience(), makeReplyDestinationViaBrokerFilter(), makeEmptyDelivery()), }, WantDeletes: []clientgotesting.DeleteActionImpl{{ ActionImpl: clientgotesting.ActionImpl{ @@ -1640,7 +1728,7 @@ func TestReconcile(t *testing.T) { ), }}, WantCreates: []runtime.Object{ - resources.NewSubscription(makeTrigger(testNS), createTriggerChannelRef(), makeServiceURIWithAudience(), makeReplyDestinationViaBrokerFilter(), makeDLSViaBrokerFilter()), + resources.NewSubscription(ctx, makeTrigger(testNS), createTriggerChannelRef(), makeServiceURIWithAudience(), makeReplyDestinationViaBrokerFilter(), makeDLSViaBrokerFilter()), }, WantDeletes: []clientgotesting.DeleteActionImpl{{ ActionImpl: clientgotesting.ActionImpl{ @@ -1780,6 +1868,15 @@ func createTriggerChannelRef() *corev1.ObjectReference { } } +func createTriggerChannelRefInDifferentNamespace() *corev1.ObjectReference { + return &corev1.ObjectReference{ + APIVersion: "messaging.knative.dev/v1", + Kind: "InMemoryChannel", + Namespace: brokerNS, + Name: fmt.Sprintf("%s-kne-trigger", brokerName), + } +} + func makeServiceURI() *duckv1.Destination { return &duckv1.Destination{ URI: &apis.URL{ @@ -1809,7 +1906,11 @@ func makeServiceURIHTTPS() *duckv1.Destination { } func makeFilterSubscription(subscriberNamespace string) *messagingv1.Subscription { - return resources.NewSubscription(makeTrigger(subscriberNamespace), createTriggerChannelRef(), makeServiceURI(), makeBrokerRef(), makeEmptyDelivery()) + return resources.NewSubscription(ctx, makeTrigger(subscriberNamespace), createTriggerChannelRef(), makeServiceURI(), makeBrokerRef(), makeEmptyDelivery()) +} + +func makeFilterSubscriptionWithBrokerRef(subscriberNamespace string) *messagingv1.Subscription { + return resources.NewSubscription(settingCtxforCrossNamespaceEventLinks("test-user"), makeTriggerWithBrokerRef(subscriberNamespace), createTriggerChannelRefInDifferentNamespace(), makeServiceURI(), makeBrokerRefInDifferentNamespace(), makeEmptyDelivery()) } func makeTrigger(subscriberNamespace string) *eventingv1.Trigger { @@ -1840,6 +1941,37 @@ func makeTrigger(subscriberNamespace string) *eventingv1.Trigger { } } +func makeTriggerWithBrokerRef(subscriberNamespace string) *eventingv1.Trigger { + return &eventingv1.Trigger{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "eventing.knative.dev/v1", + Kind: "Trigger", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: testNS, + Name: triggerName, + UID: triggerUID, + }, + Spec: eventingv1.TriggerSpec{ + BrokerRef: &duckv1.KReference{ + Name: brokerName, + Namespace: brokerNS, + }, + Filter: &eventingv1.TriggerFilter{ + Attributes: map[string]string{"Source": "Any", "Type": "Any"}, + }, + Subscriber: duckv1.Destination{ + Ref: &duckv1.KReference{ + Name: subscriberName, + Namespace: subscriberNamespace, + Kind: subscriberKind, + APIVersion: subscriberAPIVersion, + }, + }, + }, + } +} + func makeBrokerRef() *duckv1.Destination { return &duckv1.Destination{ Ref: &duckv1.KReference{ @@ -1851,6 +1983,17 @@ func makeBrokerRef() *duckv1.Destination { } } +func makeBrokerRefInDifferentNamespace() *duckv1.Destination { + return &duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: "eventing.knative.dev/v1", + Kind: "Broker", + Namespace: brokerNS, + Name: brokerName, + }, + } +} + func makeReplyDestinationViaBrokerFilter() *duckv1.Destination { return &duckv1.Destination{ URI: &apis.URL{ @@ -2071,3 +2214,42 @@ func makeTriggerOIDCServiceAccountWithoutOwnerRef() *corev1.ServiceAccount { return sa } + +func settingCtxforCrossNamespaceEventLinks(username string) context.Context { + ctx := context.TODO() + flags := feature.Flags{ + feature.CrossNamespaceEventLinks: feature.Enabled, + } + ctx = feature.ToContext(ctx, flags) + + userInfo := &authenticationv1.UserInfo{ + Username: username, + Groups: []string{"system:authenticatedforcrossnamespacelinks"}, + } + ctx = apis.WithUserInfo(ctx, userInfo) + + cfg := &rest.Config{} + ctx = injection.WithConfig(ctx, cfg) + + return ctx +} + +func makeResourceAttributes(namespace, name, verb, group, resource string) authv1.ResourceAttributes { + return authv1.ResourceAttributes{ + Namespace: namespace, + Name: name, + Verb: verb, + Group: group, + Resource: resource, + } +} + +func makeSubjectAccessReview(username string, action authv1.ResourceAttributes) *authv1.SubjectAccessReview { + return &authv1.SubjectAccessReview{ + Spec: authv1.SubjectAccessReviewSpec{ + ResourceAttributes: &action, + User: username, + Groups: []string{"system:authenticatedforcrossnamespacelinks"}, + }, + } +} diff --git a/pkg/reconciler/subscription/subscription_test.go b/pkg/reconciler/subscription/subscription_test.go index 2c44305b331..d74c1e5664d 100644 --- a/pkg/reconciler/subscription/subscription_test.go +++ b/pkg/reconciler/subscription/subscription_test.go @@ -474,7 +474,7 @@ func TestAllCases(t *testing.T) { WithRoleRules( WithPolicyRule( WithAPIGroups([]string{"messaging.knative.dev"}), - WithResources("InMemoryChannel"), + WithResources("InMemoryChannels"), WithVerbs("knsubscribe")))), // Rolebinding CreateRoleBinding("test-role", channelNS, @@ -507,7 +507,7 @@ func TestAllCases(t *testing.T) { ), }}, WantCreates: []runtime.Object{ - makeSubjectAccessReview("test-user", makeResourceAttributes(channelNS, channelName, "knsubscribe", "messaging.knative.dev", "InMemoryChannel")), + makeSubjectAccessReview("test-user", makeResourceAttributes(channelNS, channelName, "knsubscribe", "messaging.knative.dev", "inmemorychannels")), }, }, { Name: "subscription goes ready without api version", diff --git a/pkg/reconciler/testing/v1/broker.go b/pkg/reconciler/testing/v1/broker.go index 4677c628df1..d7ddbd31c90 100644 --- a/pkg/reconciler/testing/v1/broker.go +++ b/pkg/reconciler/testing/v1/broker.go @@ -225,6 +225,15 @@ func WithChannelNameAnnotation(name string) BrokerOption { } } +func WithChannelNamespaceAnnotation(namespace string) BrokerOption { + return func(b *v1.Broker) { + if b.Status.Annotations == nil { + b.Status.Annotations = make(map[string]string, 1) + } + b.Status.Annotations[eventing.BrokerChannelNamespaceStatusAnnotationKey] = namespace + } +} + func WithDeadLeaderSink(d duckv1.Destination) BrokerOption { return func(b *v1.Broker) { if b.Spec.Delivery == nil { diff --git a/pkg/reconciler/testing/v1/factory.go b/pkg/reconciler/testing/v1/factory.go index 9adf6420757..0481c6cdfb9 100644 --- a/pkg/reconciler/testing/v1/factory.go +++ b/pkg/reconciler/testing/v1/factory.go @@ -20,6 +20,7 @@ import ( "context" "encoding/json" "slices" + "strings" "testing" authv1 "k8s.io/api/authorization/v1" @@ -136,8 +137,12 @@ func MakeFactory(ctor Ctor, unstructured bool, logger *zap.SugaredLogger) Factor continue } for _, rule := range role.Rules { + resources := make([]string, 0, len(rule.Resources)) + for _, resource := range rule.Resources { + resources = append(resources, strings.ToLower(resource)) + } if slices.Contains(rule.APIGroups, sar.Spec.ResourceAttributes.Group) && - (slices.Contains(rule.Resources, "*") || slices.Contains(rule.Resources, sar.Spec.ResourceAttributes.Resource)) && + (slices.Contains(rule.Resources, "*") || slices.Contains(resources, strings.ToLower(sar.Spec.ResourceAttributes.Resource))) && slices.Contains(rule.Verbs, sar.Spec.ResourceAttributes.Verb) { res := sar.DeepCopy() res.Status.Allowed = true diff --git a/pkg/reconciler/testing/v1/trigger.go b/pkg/reconciler/testing/v1/trigger.go index 0011ee787a0..24678e82cfb 100644 --- a/pkg/reconciler/testing/v1/trigger.go +++ b/pkg/reconciler/testing/v1/trigger.go @@ -54,6 +54,22 @@ func NewTrigger(name, namespace, broker string, to ...TriggerOption) *v1.Trigger return t } +func NewTriggerWithBrokerRef(name, namespace string, to ...TriggerOption) *v1.Trigger { + t := &v1.Trigger{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + } + + for _, opt := range to { + opt(t) + } + + t.SetDefaults(context.Background()) + return t +} + func WithTriggerSubscriberURI(rawurl string) TriggerOption { uri, _ := apis.ParseURL(rawurl) return func(t *v1.Trigger) { @@ -163,6 +179,17 @@ func WithTriggerBrokerUnknown(reason, message string) TriggerOption { } } +func WithTriggerBrokerRef(gvk metav1.GroupVersionKind, name string, namespace string) TriggerOption { + return func(t *v1.Trigger) { + t.Spec.BrokerRef = &duckv1.KReference{ + APIVersion: apiVersion(gvk), + Kind: gvk.Kind, + Name: name, + Namespace: namespace, + } + } +} + func WithTriggerNotSubscribed(reason, message string) TriggerOption { return func(t *v1.Trigger) { t.Status.MarkNotSubscribed(reason, message) diff --git a/vendor/knative.dev/pkg/reconciler/configstore.go b/vendor/knative.dev/pkg/reconciler/configstore.go index c5c9c319fe6..6b61856d566 100644 --- a/vendor/knative.dev/pkg/reconciler/configstore.go +++ b/vendor/knative.dev/pkg/reconciler/configstore.go @@ -20,6 +20,20 @@ import "context" // ConfigStore is used to attach the frozen configuration to the context. type ConfigStore interface { - // ConfigStore is used to attach the frozen configuration to the context. + // ToContext is used to attach the frozen configuration to the context. ToContext(ctx context.Context) context.Context } + +// ConfigStores is used to combine multiple ConfigStore and attach multiple frozen configurations +// to the context. +type ConfigStores []ConfigStore + +// ConfigStores implements ConfigStore interface. +var _ ConfigStore = ConfigStores{} + +func (stores ConfigStores) ToContext(ctx context.Context) context.Context { + for _, s := range stores { + ctx = s.ToContext(ctx) + } + return ctx +} diff --git a/vendor/knative.dev/reconciler-test/pkg/environment/standard.go b/vendor/knative.dev/reconciler-test/pkg/environment/standard.go index 64da9bbc5cb..bbfdbb25593 100644 --- a/vendor/knative.dev/reconciler-test/pkg/environment/standard.go +++ b/vendor/knative.dev/reconciler-test/pkg/environment/standard.go @@ -87,6 +87,22 @@ func NewStandardGlobalEnvironmentWithRestConfig(cfg *rest.Config, opts ...Config // features to pull Kubernetes clients or the test environment out of the // context passed in the features. var startInformers func() + + if config.Config == nil { + config.Config = injection.ParseAndGetRESTConfigOrDie() + } + // Respect user provided settings, but if omitted customize the default behavior. + // + // Use 20 times the default QPS and Burst to speed up testing since this client is used by + // every running test. + multiplier := 20 + if config.Config.QPS == 0 { + config.Config.QPS = rest.DefaultQPS * float32(multiplier) + } + if config.Config.Burst == 0 { + config.Config.Burst = rest.DefaultBurst * multiplier + } + ctx, startInformers = injection.EnableInjectionOrDie(ctx, config.Config) // global is used to make instances of Environments, NewGlobalEnvironment diff --git a/vendor/knative.dev/reconciler-test/pkg/environment/timings.go b/vendor/knative.dev/reconciler-test/pkg/environment/timings.go index bf4409b06c4..b4a84add593 100644 --- a/vendor/knative.dev/reconciler-test/pkg/environment/timings.go +++ b/vendor/knative.dev/reconciler-test/pkg/environment/timings.go @@ -26,7 +26,7 @@ import ( // this has been moved to state pkg to break cycle between environment and feature package, // keeping the consts here for backwards API compatibility const ( - DefaultPollInterval = 3 * time.Second + DefaultPollInterval = 1 * time.Second DefaultPollTimeout = 2 * time.Minute ) diff --git a/vendor/modules.txt b/vendor/modules.txt index 35ada83214e..980abb11953 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1245,7 +1245,7 @@ knative.dev/hack/schema/commands knative.dev/hack/schema/docs knative.dev/hack/schema/registry knative.dev/hack/schema/schema -# knative.dev/pkg v0.0.0-20240513091600-b1fd04d5c458 +# knative.dev/pkg v0.0.0-20240515073057-11a3d46fe4d6 ## explicit; go 1.21 knative.dev/pkg/apiextensions/storageversion knative.dev/pkg/apiextensions/storageversion/cmd/migrate @@ -1397,7 +1397,7 @@ knative.dev/pkg/webhook/resourcesemantics knative.dev/pkg/webhook/resourcesemantics/conversion knative.dev/pkg/webhook/resourcesemantics/defaulting knative.dev/pkg/webhook/resourcesemantics/validation -# knative.dev/reconciler-test v0.0.0-20240507120221-c76096ce6188 +# knative.dev/reconciler-test v0.0.0-20240515112255-ba9aeb3a5b91 ## explicit; go 1.21 knative.dev/reconciler-test/cmd/eventshub knative.dev/reconciler-test/pkg/environment