diff --git a/pkg/apis/sources/v1/sinkbinding_lifecycle.go b/pkg/apis/sources/v1/sinkbinding_lifecycle.go index 0138839f2f5..5705a5b5b0c 100644 --- a/pkg/apis/sources/v1/sinkbinding_lifecycle.go +++ b/pkg/apis/sources/v1/sinkbinding_lifecycle.go @@ -23,6 +23,7 @@ import ( "strings" "go.uber.org/zap" + "k8s.io/client-go/kubernetes" corev1listers "k8s.io/client-go/listers/core/v1" corev1 "k8s.io/api/core/v1" @@ -196,13 +197,30 @@ func (sb *SinkBinding) Do(ctx context.Context, ps *duckv1.WithPod) { Value: ceOverrides, }) } - - pss, err := eventingtls.AddTrustBundleVolumes(GetTrustBundleConfigMapLister(ctx), sb, &ps.Spec.Template.Spec) + gvk := schema.GroupVersionKind{ + Group: SchemeGroupVersion.Group, + Version: SchemeGroupVersion.Version, + Kind: "SinkBinding", + } + bundles, err := eventingtls.PropagateTrustBundles(ctx, getKubeClient(ctx), GetTrustBundleConfigMapLister(ctx), gvk, sb) if err != nil { - logging.FromContext(ctx).Errorw("Failed to add trust bundle volumes %s/%s: %+v", zap.Error(err)) - return + logging.FromContext(ctx).Errorw("Failed to propagate trust bundles", zap.Error(err)) + } + if len(bundles) > 0 { + pss, err := eventingtls.AddTrustBundleVolumesFromConfigMaps(bundles, &ps.Spec.Template.Spec) + if err != nil { + logging.FromContext(ctx).Errorw("Failed to add trust bundle volumes from configmaps %s/%s: %+v", zap.Error(err)) + return + } + ps.Spec.Template.Spec = *pss + } else { + pss, err := eventingtls.AddTrustBundleVolumes(GetTrustBundleConfigMapLister(ctx), sb, &ps.Spec.Template.Spec) + if err != nil { + logging.FromContext(ctx).Errorw("Failed to add trust bundle volumes %s/%s: %+v", zap.Error(err)) + return + } + ps.Spec.Template.Spec = *pss } - ps.Spec.Template.Spec = *pss if sb.Status.OIDCTokenSecretName != nil { ps.Spec.Template.Spec.Volumes = append(ps.Spec.Template.Spec.Volumes, corev1.Volume{ @@ -310,6 +328,20 @@ func (sb *SinkBinding) Undo(ctx context.Context, ps *duckv1.WithPod) { } } +type kubeClientKey struct{} + +func WithKubeClient(ctx context.Context, k kubernetes.Interface) context.Context { + return context.WithValue(ctx, kubeClientKey{}, k) +} + +func getKubeClient(ctx context.Context) kubernetes.Interface { + k := ctx.Value(kubeClientKey{}) + if k == nil { + panic("No Kube client found in context.") + } + return k.(kubernetes.Interface) +} + type configMapListerKey struct{} func WithTrustBundleConfigMapLister(ctx context.Context, lister corev1listers.ConfigMapLister) context.Context { diff --git a/pkg/apis/sources/v1/sinkbinding_lifecycle_test.go b/pkg/apis/sources/v1/sinkbinding_lifecycle_test.go index bae0d75509f..d624c213db1 100644 --- a/pkg/apis/sources/v1/sinkbinding_lifecycle_test.go +++ b/pkg/apis/sources/v1/sinkbinding_lifecycle_test.go @@ -32,6 +32,7 @@ import ( "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/client/injection/ducks/duck/v1/addressable" + kubeclient "knative.dev/pkg/client/injection/kube/client/fake" configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/fake" fakedynamicclient "knative.dev/pkg/injection/clients/dynamicclient/fake" "knative.dev/pkg/resolver" @@ -906,6 +907,7 @@ func TestSinkBindingDo(t *testing.T) { } ctx = WithURIResolver(ctx, r) ctx = WithTrustBundleConfigMapLister(ctx, configmapinformer.Get(ctx).Lister()) + ctx = WithKubeClient(ctx, kubeclient.Get(ctx)) for _, cm := range test.configMaps { _ = configmapinformer.Get(ctx).Informer().GetIndexer().Add(cm) diff --git a/pkg/eventingtls/trust_bundle.go b/pkg/eventingtls/trust_bundle.go index a5c82b7139b..6eff1868f21 100644 --- a/pkg/eventingtls/trust_bundle.go +++ b/pkg/eventingtls/trust_bundle.go @@ -57,18 +57,20 @@ var ( // PropagateTrustBundles propagates Trust bundles ConfigMaps from the system.Namespace() to the // obj namespace. -func PropagateTrustBundles(ctx context.Context, k8s kubernetes.Interface, trustBundleConfigMapLister corev1listers.ConfigMapLister, gvk schema.GroupVersionKind, obj kmeta.Accessor) error { +func PropagateTrustBundles(ctx context.Context, k8s kubernetes.Interface, trustBundleConfigMapLister corev1listers.ConfigMapLister, gvk schema.GroupVersionKind, obj kmeta.Accessor) ([]*corev1.ConfigMap, error) { systemNamespaceBundles, err := trustBundleConfigMapLister.ConfigMaps(system.Namespace()).List(TrustBundleSelector) if err != nil { - return fmt.Errorf("failed to list trust bundle ConfigMaps in %q: %w", system.Namespace(), err) + return nil, fmt.Errorf("failed to list trust bundle ConfigMaps in %q: %w", system.Namespace(), err) } userNamespaceBundles, err := trustBundleConfigMapLister.ConfigMaps(obj.GetNamespace()).List(TrustBundleSelector) if err != nil { - return fmt.Errorf("failed to list trust bundles ConfigMaps in %q: %w", obj.GetNamespace(), err) + return nil, fmt.Errorf("failed to list trust bundles ConfigMaps in %q: %w", obj.GetNamespace(), err) } + outputUserNamespaceBundles := make([]*corev1.ConfigMap, 0, len(systemNamespaceBundles)) + type Pair struct { sysCM *corev1.ConfigMap userCm *corev1.ConfigMap @@ -114,7 +116,7 @@ func PropagateTrustBundles(ctx context.Context, k8s kubernetes.Interface, trustB // Only delete the ConfigMap if the object owns it if equality.Semantic.DeepDerivative(expectedOr, or) { if err := deleteConfigMap(ctx, k8s, obj, p.userCm); err != nil { - return err + return nil, err } } } @@ -136,8 +138,9 @@ func PropagateTrustBundles(ctx context.Context, k8s kubernetes.Interface, trustB // Update owner references expected.OwnerReferences = withOwnerReferences(obj, gvk, []metav1.OwnerReference{}) if err := createConfigMap(ctx, k8s, expected); err != nil { - return err + return nil, err } + outputUserNamespaceBundles = append(outputUserNamespaceBundles, expected) continue } @@ -146,13 +149,17 @@ func PropagateTrustBundles(ctx context.Context, k8s kubernetes.Interface, trustB // Update owner references expected.OwnerReferences = withOwnerReferences(obj, gvk, p.userCm.OwnerReferences) - if !equality.Semantic.DeepDerivative(expected, p.userCm) { + if !equality.Semantic.DeepDerivative(expected.Data, p.userCm.Data) || + !equality.Semantic.DeepDerivative(expected.BinaryData, p.userCm.BinaryData) || + !equality.Semantic.DeepDerivative(expected.Labels, p.userCm.Labels) { if err := updateConfigMap(ctx, k8s, expected); err != nil { - return err + return nil, err } } + outputUserNamespaceBundles = append(outputUserNamespaceBundles, expected) } - return nil + + return outputUserNamespaceBundles, nil } func AddTrustBundleVolumes(trustBundleLister corev1listers.ConfigMapLister, obj kmeta.Accessor, pt *corev1.PodSpec) (*corev1.PodSpec, error) { @@ -160,7 +167,10 @@ func AddTrustBundleVolumes(trustBundleLister corev1listers.ConfigMapLister, obj if err != nil { return nil, fmt.Errorf("failed to list trust bundles ConfigMaps in %q: %w", obj.GetNamespace(), err) } + return AddTrustBundleVolumesFromConfigMaps(cms, pt) +} +func AddTrustBundleVolumesFromConfigMaps(cms []*corev1.ConfigMap, pt *corev1.PodSpec) (*corev1.PodSpec, error) { pt = pt.DeepCopy() sources := make([]corev1.VolumeProjection, 0, len(cms)) for _, cm := range cms { diff --git a/pkg/reconciler/apiserversource/apiserversource.go b/pkg/reconciler/apiserversource/apiserversource.go index a4051f378ab..2f712ae53ee 100644 --- a/pkg/reconciler/apiserversource/apiserversource.go +++ b/pkg/reconciler/apiserversource/apiserversource.go @@ -464,5 +464,6 @@ func (r *Reconciler) propagateTrustBundles(ctx context.Context, source *v1.ApiSe Version: v1.SchemeGroupVersion.Version, Kind: "ApiServerSource", } - return eventingtls.PropagateTrustBundles(ctx, r.kubeClientSet, r.trustBundleConfigMapLister, gvk, source) + _, err := eventingtls.PropagateTrustBundles(ctx, r.kubeClientSet, r.trustBundleConfigMapLister, gvk, source) + return err } diff --git a/pkg/reconciler/sinkbinding/controller.go b/pkg/reconciler/sinkbinding/controller.go index 573b3c737e6..bcaec7af49d 100644 --- a/pkg/reconciler/sinkbinding/controller.go +++ b/pkg/reconciler/sinkbinding/controller.go @@ -142,8 +142,9 @@ func NewController( trustBundleConfigMapLister: trustBundleConfigMapLister, } + k8s := kubeclient.Get(ctx) c.WithContext = func(ctx context.Context, b psbinding.Bindable) (context.Context, error) { - return v1.WithTrustBundleConfigMapLister(v1.WithURIResolver(ctx, sbResolver), trustBundleConfigMapLister), nil + return v1.WithKubeClient(v1.WithTrustBundleConfigMapLister(v1.WithURIResolver(ctx, sbResolver), trustBundleConfigMapLister), k8s), nil } c.Tracker = impl.Tracker c.Factory = &duck.CachedInformerFactory{ @@ -226,9 +227,10 @@ func ListAll(ctx context.Context, handler cache.ResourceEventHandler) psbinding. func WithContextFactory(ctx context.Context, lister corev1listers.ConfigMapLister, handler func(types.NamespacedName)) psbinding.BindableContext { r := resolver.NewURIResolverFromTracker(ctx, tracker.New(handler, controller.GetTrackerLease(ctx))) + k := kubeclient.Get(ctx) return func(ctx context.Context, b psbinding.Bindable) (context.Context, error) { - return v1.WithTrustBundleConfigMapLister(v1.WithURIResolver(ctx, r), lister), nil + return v1.WithKubeClient(v1.WithTrustBundleConfigMapLister(v1.WithURIResolver(ctx, r), lister), k), nil } } diff --git a/pkg/reconciler/sinkbinding/sinkbinding.go b/pkg/reconciler/sinkbinding/sinkbinding.go index 74744d24453..665fa2b2d50 100644 --- a/pkg/reconciler/sinkbinding/sinkbinding.go +++ b/pkg/reconciler/sinkbinding/sinkbinding.go @@ -245,5 +245,6 @@ func (s *SinkBindingSubResourcesReconciler) propagateTrustBundles(ctx context.Co Version: v1.SchemeGroupVersion.Version, Kind: "SinkBinding", } - return eventingtls.PropagateTrustBundles(ctx, s.kubeclient, s.trustBundleConfigMapLister, gvk, sb) + _, err := eventingtls.PropagateTrustBundles(ctx, s.kubeclient, s.trustBundleConfigMapLister, gvk, sb) + return err } diff --git a/test/rekt/apiserversource_test.go b/test/rekt/apiserversource_test.go index f9ff6f9094e..92077775b5f 100644 --- a/test/rekt/apiserversource_test.go +++ b/test/rekt/apiserversource_test.go @@ -105,12 +105,13 @@ func TestApiServerSourceDataPlaneTLS(t *testing.T) { knative.WithLoggingConfig, knative.WithTracingConfig, k8s.WithEventListener, - //environment.Managed(t), + environment.Managed(t), eventshub.WithTLS(t), ) env.ParallelTest(ctx, t, apiserversourcefeatures.SendsEventsWithTLS()) env.ParallelTest(ctx, t, apiserversourcefeatures.SendsEventsWithTLSTrustBundle()) + env.ParallelTest(ctx, t, apiserversourcefeatures.SendsEventsWithTLSWithAdditionalTrustBundle()) } func TestApiServerSourceDataPlane_EventModes(t *testing.T) { diff --git a/test/rekt/channel_test.go b/test/rekt/channel_test.go index 6cca352dd22..478326e727b 100644 --- a/test/rekt/channel_test.go +++ b/test/rekt/channel_test.go @@ -358,6 +358,7 @@ func TestInMemoryChannelTLS(t *testing.T) { env.ParallelTest(ctx, t, channel.SubscriptionTLS()) env.ParallelTest(ctx, t, channel.SubscriptionTLSTrustBundle()) + env.ParallelTest(ctx, t, channel.SubscriptionTLSWithAdditionalTrustBundle()) } func TestChannelImplDispatcherAuthenticatesWithOIDC(t *testing.T) { diff --git a/test/rekt/features/apiserversource/data_plane.go b/test/rekt/features/apiserversource/data_plane.go index 40b31c6fa73..86cfc4eb3ae 100644 --- a/test/rekt/features/apiserversource/data_plane.go +++ b/test/rekt/features/apiserversource/data_plane.go @@ -26,6 +26,7 @@ import ( duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/network" "knative.dev/reconciler-test/pkg/environment" + "knative.dev/reconciler-test/pkg/knative" "knative.dev/eventing/pkg/eventingtls/eventingtlstesting" "knative.dev/eventing/test/rekt/resources/addressable" @@ -272,6 +273,62 @@ func SendsEventsWithTLSTrustBundle() *feature.Feature { return f } +func SendsEventsWithTLSWithAdditionalTrustBundle() *feature.Feature { + src := feature.MakeRandomK8sName("apiserversource") + sink := feature.MakeRandomK8sName("sink") + trustBundle := feature.MakeRandomK8sName("trust-bundle") + + f := feature.NewFeatureNamed("Send events to TLS sink - additional trust bundle") + + f.Prerequisite("should not run when Istio is enabled", featureflags.IstioDisabled()) + + f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiverTLS)) + + f.Setup("Add trust bundle to system namespace", func(ctx context.Context, t feature.T) { + + configmap.Install(trustBundle, knative.KnativeNamespaceFromContext(ctx), + configmap.WithLabels(map[string]string{"networking.knative.dev/trust-bundle": "true"}), + configmap.WithData("ca.crt", *eventshub.GetCaCerts(ctx)), + )(ctx, t) + }) + + sacmName := feature.MakeRandomK8sName("apiserversource") + f.Requirement("Create Service Account for ApiServerSource with RBAC for v1.Event resources", + setupAccountAndRoleForPods(sacmName)) + + cfg := []manifest.CfgFn{ + apiserversource.WithServiceAccountName(sacmName), + apiserversource.WithEventMode(v1.ResourceMode), + apiserversource.WithResources(v1.APIVersionKindSelector{ + APIVersion: "v1", + Kind: "Event", + }), + } + + f.Requirement("install ApiServerSource", func(ctx context.Context, t feature.T) { + cfg = append(cfg, apiserversource.WithSink(&duckv1.Destination{ + URI: &apis.URL{ + Scheme: "https", // Force using https + Host: network.GetServiceHostname(sink, environment.FromContext(ctx).Namespace()), + }, + CACerts: nil, // CA certs are in the new trust-bundle + })) + apiserversource.Install(src, cfg...)(ctx, t) + }) + f.Requirement("ApiServerSource goes ready", apiserversource.IsReady(src)) + + f.Stable("ApiServerSource as event source"). + Must("delivers events on sink with ref", + eventassert.OnStore(sink). + Match(eventassert.MatchKind(eventshub.EventReceived)). + MatchEvent(test.HasType("dev.knative.apiserver.resource.update")). + AtLeast(1), + ). + Must("Set sinkURI to HTTPS endpoint", source.ExpectHTTPSSink(apiserversource.Gvr(), src)) + + return f +} + // SendsEventsWithEventTypes tests apiserversource to a ready broker. func SendsEventsWithEventTypes() *feature.Feature { source := feature.MakeRandomK8sName("source") diff --git a/test/rekt/features/channel/eventing_tls_feature.go b/test/rekt/features/channel/eventing_tls_feature.go index 3bb633afef8..45b48b7e065 100644 --- a/test/rekt/features/channel/eventing_tls_feature.go +++ b/test/rekt/features/channel/eventing_tls_feature.go @@ -31,6 +31,7 @@ import ( "knative.dev/reconciler-test/pkg/eventshub" "knative.dev/reconciler-test/pkg/eventshub/assert" "knative.dev/reconciler-test/pkg/feature" + "knative.dev/reconciler-test/pkg/knative" "knative.dev/reconciler-test/pkg/resources/service" "knative.dev/reconciler-test/resources/certificate" @@ -38,6 +39,7 @@ import ( "knative.dev/eventing/test/rekt/features/featureflags" "knative.dev/eventing/test/rekt/resources/addressable" "knative.dev/eventing/test/rekt/resources/channel_impl" + "knative.dev/eventing/test/rekt/resources/configmap" "knative.dev/eventing/test/rekt/resources/subscription" ) @@ -243,3 +245,88 @@ func SubscriptionTLSTrustBundle() *feature.Feature { return f } + +func SubscriptionTLSWithAdditionalTrustBundle() *feature.Feature { + + channelName := feature.MakeRandomK8sName("channel") + subscriptionName := feature.MakeRandomK8sName("sub") + sink := feature.MakeRandomK8sName("sink") + source := feature.MakeRandomK8sName("source") + dlsName := feature.MakeRandomK8sName("dls") + dlsSubscriptionName := feature.MakeRandomK8sName("dls-sub") + trustBundle := feature.MakeRandomK8sName("trust-bundle") + + f := feature.NewFeature() + + f.Prerequisite("transport encryption is strict", featureflags.TransportEncryptionStrict()) + f.Prerequisite("should not run when Istio is enabled", featureflags.IstioDisabled()) + + f.Setup("Add trust bundle to system namespace", func(ctx context.Context, t feature.T) { + + configmap.Install(trustBundle, knative.KnativeNamespaceFromContext(ctx), + configmap.WithLabels(map[string]string{"networking.knative.dev/trust-bundle": "true"}), + configmap.WithData("ca.crt", *eventshub.GetCaCerts(ctx)), + )(ctx, t) + }) + + f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiverTLS)) + f.Setup("install sink", eventshub.Install(dlsName, eventshub.StartReceiverTLS)) + f.Setup("install channel", channel_impl.Install(channelName)) + f.Setup("channel is ready", channel_impl.IsReady(channelName)) + + f.Setup("install subscription", func(ctx context.Context, t feature.T) { + d := &duckv1.Destination{ + URI: &apis.URL{ + Scheme: "https", // Force using https + Host: network.GetServiceHostname(sink, environment.FromContext(ctx).Namespace()), + }, + CACerts: nil, // CA certs are in the new trust-bundle + } + subscription.Install(subscriptionName, + subscription.WithChannel(channel_impl.AsRef(channelName)), + subscription.WithSubscriberFromDestination(d))(ctx, t) + }) + f.Setup("subscription is ready", subscription.IsReady(subscriptionName)) + f.Setup("install dead letter subscription", func(ctx context.Context, t feature.T) { + d := &duckv1.Destination{ + URI: &apis.URL{ + Scheme: "https", // Force using https + Host: network.GetServiceHostname(dlsName, environment.FromContext(ctx).Namespace()), + }, + CACerts: nil, // CA certs are in the trust-bundle + } + + subscription.Install(dlsSubscriptionName, + subscription.WithChannel(channel_impl.AsRef(channelName)), + subscription.WithDeadLetterSinkFromDestination(d), + subscription.WithSubscriber(nil, "http://127.0.0.1:2468", ""))(ctx, t) + }) + f.Setup("subscription dead letter is ready", subscription.IsReady(dlsSubscriptionName)) + f.Setup("Channel has HTTPS address", channel_impl.ValidateAddress(channelName, addressable.AssertHTTPSAddress)) + + event := cetest.FullEvent() + event.SetID(uuid.New().String()) + + f.Requirement("install source", eventshub.Install(source, + eventshub.StartSenderToResourceTLS(channel_impl.GVR(), channelName, nil), + eventshub.InputEvent(event), + // Send multiple events so that we take into account that the certificate rotation might + // be detected by the server after some time. + eventshub.SendMultipleEvents(100, 3*time.Second), + )) + + f.Assert("Event sent", assert.OnStore(source). + MatchSentEvent(cetest.HasId(event.ID())). + AtLeast(1), + ) + f.Assert("Event received in sink", assert.OnStore(sink). + MatchReceivedEvent(cetest.HasId(event.ID())). + AtLeast(1), + ) + f.Assert("Event received in dead letter sink", assert.OnStore(dlsName). + MatchReceivedEvent(cetest.HasId(event.ID())). + AtLeast(1), + ) + + return f +} diff --git a/test/rekt/features/pingsource/features.go b/test/rekt/features/pingsource/features.go index f59858d6d8c..6052022c4e4 100644 --- a/test/rekt/features/pingsource/features.go +++ b/test/rekt/features/pingsource/features.go @@ -27,6 +27,7 @@ import ( "knative.dev/reconciler-test/pkg/environment" "knative.dev/reconciler-test/pkg/eventshub" "knative.dev/reconciler-test/pkg/feature" + "knative.dev/reconciler-test/pkg/knative" "knative.dev/reconciler-test/pkg/manifest" "knative.dev/reconciler-test/pkg/resources/service" @@ -34,6 +35,7 @@ import ( "knative.dev/eventing/pkg/eventingtls/eventingtlstesting" "knative.dev/eventing/test/rekt/resources/addressable" "knative.dev/eventing/test/rekt/resources/broker" + "knative.dev/eventing/test/rekt/resources/configmap" "knative.dev/eventing/test/rekt/resources/eventtype" "knative.dev/eventing/test/rekt/resources/trigger" @@ -132,6 +134,48 @@ func SendsEventsTLSTrustBundle() *feature.Feature { return f } +func SendsEventsTLSWithAdditionalTrustBundle() *feature.Feature { + src := feature.MakeRandomK8sName("pingsource") + sink := feature.MakeRandomK8sName("sink") + trustBundle := feature.MakeRandomK8sName("trust-bundle") + + f := feature.NewFeature() + + f.Prerequisite("should not run when Istio is enabled", featureflags.IstioDisabled()) + + f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiverTLS)) + + f.Setup("Add trust bundle to system namespace", func(ctx context.Context, t feature.T) { + + configmap.Install(trustBundle, knative.KnativeNamespaceFromContext(ctx), + configmap.WithLabels(map[string]string{"networking.knative.dev/trust-bundle": "true"}), + configmap.WithData("ca.crt", *eventshub.GetCaCerts(ctx)), + )(ctx, t) + }) + + f.Requirement("install pingsource", func(ctx context.Context, t feature.T) { + d := &duckv1.Destination{ + URI: &apis.URL{ + Scheme: "https", // Force using https + Host: network.GetServiceHostname(sink, environment.FromContext(ctx).Namespace()), + }, + CACerts: nil, // CA certs are in the trust-bundle + } + + pingsource.Install(src, pingsource.WithSink(d))(ctx, t) + }) + f.Requirement("pingsource goes ready", pingsource.IsReady(src)) + + f.Stable("pingsource as event source"). + Must("delivers events", assert.OnStore(sink). + Match(eventassert.MatchKind(eventshub.EventReceived)). + MatchEvent(test.HasType("dev.knative.sources.ping")). + AtLeast(1)). + Must("Set sinkURI to HTTPS endpoint", source.ExpectHTTPSSink(pingsource.Gvr(), src)) + + return f +} + func SendsEventsWithSinkURI() *feature.Feature { source := feature.MakeRandomK8sName("pingsource") sink := feature.MakeRandomK8sName("sink") diff --git a/test/rekt/features/trigger/feature.go b/test/rekt/features/trigger/feature.go index 219e48535ce..e0ae7b84ff9 100644 --- a/test/rekt/features/trigger/feature.go +++ b/test/rekt/features/trigger/feature.go @@ -27,6 +27,7 @@ import ( "knative.dev/reconciler-test/pkg/environment" "knative.dev/reconciler-test/pkg/eventshub" "knative.dev/reconciler-test/pkg/feature" + "knative.dev/reconciler-test/pkg/knative" "knative.dev/reconciler-test/pkg/manifest" "knative.dev/reconciler-test/pkg/resources/service" @@ -36,6 +37,7 @@ import ( "knative.dev/eventing/pkg/eventingtls/eventingtlstesting" "knative.dev/eventing/test/rekt/features/featureflags" "knative.dev/eventing/test/rekt/resources/broker" + "knative.dev/eventing/test/rekt/resources/configmap" "knative.dev/eventing/test/rekt/resources/pingsource" "knative.dev/eventing/test/rekt/resources/trigger" ) @@ -236,3 +238,84 @@ func TriggerWithTLSSubscriberTrustBundle() *feature.Feature { return f } + +func TriggerWithTLSSubscriberWithAdditionalCATrustBundles() *feature.Feature { + f := feature.NewFeatureNamed("Trigger with TLS subscriber and additional trust bundle") + + f.Prerequisite("should not run when Istio is enabled", featureflags.IstioDisabled()) + + brokerName := feature.MakeRandomK8sName("broker") + sourceName := feature.MakeRandomK8sName("source") + sinkName := feature.MakeRandomK8sName("sink") + triggerName := feature.MakeRandomK8sName("trigger") + dlsName := feature.MakeRandomK8sName("dls") + dlsTriggerName := feature.MakeRandomK8sName("dls-trigger") + trustBundle := feature.MakeRandomK8sName("trust-bundle") + + eventToSend := test.FullEvent() + + // Install Broker + f.Setup("Install Broker", broker.Install(brokerName, broker.WithEnvConfig()...)) + f.Setup("Broker is ready", broker.IsReady(brokerName)) + f.Setup("Broker is addressable", broker.IsAddressable(brokerName)) + + // Install Sink + f.Setup("Install Sink", eventshub.Install(sinkName, eventshub.StartReceiverTLS)) + f.Setup("Install dead letter sink service", eventshub.Install(dlsName, eventshub.StartReceiverTLS)) + + f.Setup("Add trust bundle to system namespace", func(ctx context.Context, t feature.T) { + + configmap.Install(trustBundle, knative.KnativeNamespaceFromContext(ctx), + configmap.WithLabels(map[string]string{"networking.knative.dev/trust-bundle": "true"}), + configmap.WithData("ca.crt", *eventshub.GetCaCerts(ctx)), + )(ctx, t) + }) + + // Install Trigger + f.Setup("Install trigger", func(ctx context.Context, t feature.T) { + subscriber := &duckv1.Destination{ + URI: &apis.URL{ + Scheme: "https", // Force using https + Host: network.GetServiceHostname(sinkName, environment.FromContext(ctx).Namespace()), + }, + CACerts: nil, // CA certs are in the new trust-bundle + } + + trigger.Install(triggerName, brokerName, + trigger.WithSubscriberFromDestination(subscriber))(ctx, t) + }) + f.Setup("Wait for Trigger to become ready", trigger.IsReady(triggerName)) + + f.Setup("Install failing trigger", func(ctx context.Context, t feature.T) { + dls := &duckv1.Destination{ + URI: &apis.URL{ + Scheme: "https", // Force using https + Host: network.GetServiceHostname(dlsName, environment.FromContext(ctx).Namespace()), + }, + CACerts: nil, // CA certs are in the new trust-bundle + } + + linear := eventingv1.BackoffPolicyLinear + trigger.Install(dlsTriggerName, brokerName, + trigger.WithRetry(2, &linear, pointer.String("PT1S")), + trigger.WithDeadLetterSinkFromDestination(dls), + trigger.WithSubscriber(nil, "http://127.0.0.1:2468"))(ctx, t) + }) + f.Setup("Wait for failing Trigger to become ready", trigger.IsReady(dlsTriggerName)) + + // Install Source + f.Requirement("Install Source", eventshub.Install( + sourceName, + eventshub.StartSenderToResource(broker.GVR(), brokerName), + eventshub.InputEvent(eventToSend), + )) + + f.Assert("Trigger delivers events to TLS subscriber", assert.OnStore(sinkName). + MatchReceivedEvent(test.HasId(eventToSend.ID())). + AtLeast(1)) + f.Assert("Trigger delivers events to TLS dead letter sink", assert.OnStore(dlsName). + MatchReceivedEvent(test.HasId(eventToSend.ID())). + AtLeast(1)) + + return f +} diff --git a/test/rekt/pingsource_test.go b/test/rekt/pingsource_test.go index 1aab02f7e8c..8bc45a0f43d 100644 --- a/test/rekt/pingsource_test.go +++ b/test/rekt/pingsource_test.go @@ -61,6 +61,7 @@ func TestPingSourceTLS(t *testing.T) { env.ParallelTest(ctx, t, pingsource.SendsEventsTLS()) env.ParallelTest(ctx, t, pingsource.SendsEventsTLSTrustBundle()) + env.ParallelTest(ctx, t, pingsource.SendsEventsTLSWithAdditionalTrustBundle()) } func TestPingSourceWithSinkURI(t *testing.T) { diff --git a/test/rekt/trigger_test.go b/test/rekt/trigger_test.go index 66981384270..6b17ae110e9 100644 --- a/test/rekt/trigger_test.go +++ b/test/rekt/trigger_test.go @@ -95,4 +95,5 @@ func TestTriggerTLSSubscriber(t *testing.T) { env.ParallelTest(ctx, t, trigger.TriggerWithTLSSubscriber()) env.ParallelTest(ctx, t, trigger.TriggerWithTLSSubscriberTrustBundle()) + env.ParallelTest(ctx, t, trigger.TriggerWithTLSSubscriberWithAdditionalCATrustBundles()) }