Skip to content

Commit

Permalink
Merge branch 'main' into ca-rotation-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pierDipi committed May 21, 2024
2 parents 50ec78c + 5355171 commit 48de042
Show file tree
Hide file tree
Showing 28 changed files with 817 additions and 142 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/knative-go-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@ on:
jobs:
test:
uses: knative/actions/.github/workflows/reusable-go-test.yaml@main
secrets:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,9 @@ rules:
- "delete"
- "patch"
- "watch"
- apiGroups:
- eventing.knative.dev
resources:
- brokers
verbs:
- "knsubscribe"
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ rules:
- list
- watch
- update
- knsubscribe
- apiGroups:
- messaging.knative.dev
resources:
Expand Down
2 changes: 1 addition & 1 deletion config/core/configmaps/features.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ data:
# For more details: https://github.com/knative/eventing/issues/5204
new-trigger-filters: "enabled"

# ALPHA feature: The transport-encryption flag allows you to encrypt events in transit using the transport layer security (TLS) protocol.
# BETA feature: The transport-encryption flag allows you to encrypt events in transit using the transport layer security (TLS) protocol.
# For more details: https://github.com/knative/eventing/issues/5957
transport-encryption: "disabled"

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ require (
k8s.io/utils v0.0.0-20240102154912-e7106e64919e
knative.dev/hack v0.0.0-20240507013718-68e3bfb39d11
knative.dev/hack/schema v0.0.0-20240507013718-68e3bfb39d11
knative.dev/pkg v0.0.0-20240513091600-b1fd04d5c458
knative.dev/reconciler-test v0.0.0-20240507120221-c76096ce6188
knative.dev/pkg v0.0.0-20240515073057-11a3d46fe4d6
knative.dev/reconciler-test v0.0.0-20240515112255-ba9aeb3a5b91
sigs.k8s.io/yaml v1.4.0
)

Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -892,10 +892,10 @@ knative.dev/hack v0.0.0-20240507013718-68e3bfb39d11 h1:CYoD72R8/R35REjeY2nnWfBak
knative.dev/hack v0.0.0-20240507013718-68e3bfb39d11/go.mod h1:yk2OjGDsbEnQjfxdm0/HJKS2WqTLEFg/N6nUs6Rqx3Q=
knative.dev/hack/schema v0.0.0-20240507013718-68e3bfb39d11 h1:QlqQMJijcdrY5uN6auYRNaZaR9YiukcZ7VQD2SE+a58=
knative.dev/hack/schema v0.0.0-20240507013718-68e3bfb39d11/go.mod h1:3pWwBLnTZSM9psSgCAvhKOHIPTzqfEMlWRpDu6IYhK0=
knative.dev/pkg v0.0.0-20240513091600-b1fd04d5c458 h1:ESofRToj3xFQfKd5rlwd3EHd7G/CbVpchrUsw1HzI1w=
knative.dev/pkg v0.0.0-20240513091600-b1fd04d5c458/go.mod h1:fkgcK/71v1QSJza7pCOxtuk7zSsWYPQ7eiuX8M2wXxs=
knative.dev/reconciler-test v0.0.0-20240507120221-c76096ce6188 h1:uOzt7ZVFpHoMSfjut8H9d1pafNPF2Luat/w5QMV+CIY=
knative.dev/reconciler-test v0.0.0-20240507120221-c76096ce6188/go.mod h1:kZEZ0/oQWnS1wBUgQqer/N9k6IzI4jwLLY2xCblEit4=
knative.dev/pkg v0.0.0-20240515073057-11a3d46fe4d6 h1:mUZ3ZrZFIfHtaILKPodBX1WnFQVpVSdA+e0DaUqIe30=
knative.dev/pkg v0.0.0-20240515073057-11a3d46fe4d6/go.mod h1:fkgcK/71v1QSJza7pCOxtuk7zSsWYPQ7eiuX8M2wXxs=
knative.dev/reconciler-test v0.0.0-20240515112255-ba9aeb3a5b91 h1:sHh5uGMQg5+yVt6AkTSbbrqeESNFMG6xP8y93cfQXhk=
knative.dev/reconciler-test v0.0.0-20240515112255-ba9aeb3a5b91/go.mod h1:GY8Lr7PMvIf1kb+PKTEQdpVPynMdU3xfIWY7nqeLvb4=
pgregory.net/rapid v1.1.0 h1:CMa0sjHSru3puNx+J0MIAuiiEV4N0qj8/cMWGBBCsjw=
pgregory.net/rapid v1.1.0/go.mod h1:PY5XlDGj0+V1FCq0o192FdRhpKHGTRIWBgqjDBTrq04=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/eventing/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ const (
// annotation key used to specify the name of the channel for
// the triggers to subscribe to.
BrokerChannelNameStatusAnnotationKey = "knative.dev/channelName"

// BrokerChannelNamespaceStatusAnnotationKey is the broker status
// annotation key used to specify the namespace of the channel for
// the triggers to subscribe to.
BrokerChannelNamespaceStatusAnnotationKey = "knative.dev/channelNamespace"
)

var (
Expand Down
33 changes: 29 additions & 4 deletions pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,16 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
}

func (h *Handler) handleDispatchToReplyRequest(ctx context.Context, trigger *eventingv1.Trigger, writer http.ResponseWriter, request *http.Request, event *event.Event) {
broker, err := h.brokerLister.Brokers(trigger.Namespace).Get(trigger.Spec.Broker)
var brokerRef, brokerNamespace string
if feature.FromContext(ctx).IsEnabled(feature.CrossNamespaceEventLinks) && trigger.Spec.BrokerRef.Namespace != "" {
brokerRef = trigger.Spec.BrokerRef.Name
brokerNamespace = trigger.Spec.BrokerRef.Namespace
} else {
brokerRef = trigger.Spec.Broker
brokerNamespace = trigger.Namespace
}

broker, err := h.brokerLister.Brokers(brokerNamespace).Get(brokerRef)
if err != nil {
h.logger.Info("Unable to get the Broker", zap.Error(err))
writer.WriteHeader(http.StatusBadRequest)
Expand All @@ -239,7 +248,7 @@ func (h *Handler) handleDispatchToReplyRequest(ctx context.Context, trigger *eve
reportArgs := &ReportArgs{
ns: trigger.Namespace,
trigger: trigger.Name,
broker: trigger.Spec.Broker,
broker: brokerRef,
requestType: "reply_forward",
}

Expand All @@ -256,7 +265,16 @@ func (h *Handler) handleDispatchToReplyRequest(ctx context.Context, trigger *eve
}

func (h *Handler) handleDispatchToDLSRequest(ctx context.Context, trigger *eventingv1.Trigger, writer http.ResponseWriter, request *http.Request, event *event.Event) {
broker, err := h.brokerLister.Brokers(trigger.Namespace).Get(trigger.Spec.Broker)
var brokerRef, brokerNamespace string
if feature.FromContext(ctx).IsEnabled(feature.CrossNamespaceEventLinks) && trigger.Spec.BrokerRef.Namespace != "" {
brokerRef = trigger.Spec.BrokerRef.Name
brokerNamespace = trigger.Spec.BrokerRef.Namespace
} else {
brokerRef = trigger.Spec.Broker
brokerNamespace = trigger.Namespace
}

broker, err := h.brokerLister.Brokers(brokerNamespace).Get(brokerRef)
if err != nil {
h.logger.Info("Unable to get the Broker", zap.Error(err))
writer.WriteHeader(http.StatusBadRequest)
Expand Down Expand Up @@ -298,6 +316,13 @@ func (h *Handler) handleDispatchToDLSRequest(ctx context.Context, trigger *event
}

func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger *eventingv1.Trigger, writer http.ResponseWriter, request *http.Request, event *event.Event) {
var brokerRef string
if feature.FromContext(ctx).IsEnabled(feature.CrossNamespaceEventLinks) && trigger.Spec.BrokerRef.Namespace != "" {
brokerRef = trigger.Spec.BrokerRef.Name
} else {
brokerRef = trigger.Spec.Broker
}

triggerRef := types.NamespacedName{
Name: trigger.Name,
Namespace: trigger.Namespace,
Expand All @@ -321,7 +346,7 @@ func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger
reportArgs := &ReportArgs{
ns: trigger.Namespace,
trigger: trigger.Name,
broker: trigger.Spec.Broker,
broker: brokerRef,
filterType: triggerFilterAttribute(trigger.Spec.Filter, "type"),
requestType: "filter",
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/crossnamespace/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ func CheckNamespace(ctx context.Context, r ResourceInfo) *apis.FieldError {
return nil
}

// convert the kind (Broker or Channel) into a resource (brokers or channels)
targetResource := strings.ToLower(targetKind) + "s"

// GetUserInfo accesses the UserInfo attached to the webhook context.
userInfo := apis.GetUserInfo(ctx)
if userInfo == nil {
Expand All @@ -66,7 +69,7 @@ func CheckNamespace(ctx context.Context, r ResourceInfo) *apis.FieldError {
Namespace: targetNamespace,
Verb: "knsubscribe",
Group: targetGroup,
Resource: targetKind,
Resource: targetResource,
}

// Create the SubjectAccessReview
Expand Down
73 changes: 22 additions & 51 deletions pkg/eventingtls/trust_bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,22 +79,22 @@ func PropagateTrustBundles(ctx context.Context, k8s kubernetes.Interface, trustB
for _, cm := range systemNamespaceBundles {
name := userCMName(cm.Name)
if p, ok := state[name]; !ok {
state[name] = Pair{sysCM: cm}
state[name] = Pair{sysCM: cm.DeepCopy()}
} else {
state[name] = Pair{
sysCM: cm,
sysCM: cm.DeepCopy(),
userCm: p.userCm,
}
}
}

for _, cm := range userNamespaceBundles {
if p, ok := state[cm.Name]; !ok {
state[cm.Name] = Pair{userCm: cm}
state[cm.Name] = Pair{userCm: cm.DeepCopy()}
} else {
state[cm.Name] = Pair{
sysCM: p.sysCM,
userCm: cm,
userCm: cm.DeepCopy(),
}
}
}
Expand All @@ -107,26 +107,26 @@ func PropagateTrustBundles(ctx context.Context, k8s kubernetes.Interface, trustB
APIVersion: gvk.GroupVersion().String(),
Kind: gvk.Kind,
Name: obj.GetName(),
UID: obj.GetUID(),
}

for _, or := range p.userCm.OwnerReferences {
// Only delete the ConfigMap if the object owns it
if equality.Semantic.DeepDerivative(expectedOr, or) {
if err := deleteConfigMap(ctx, k8s, obj, p.userCm); err != nil {
return err
}
}
}

continue
}

expected := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: userCMName(p.sysCM.Name),
Namespace: obj.GetNamespace(),
Labels: map[string]string{
TrustBundleLabelKey: TrustBundleLabelValue,
},
Name: userCMName(p.sysCM.Name),
Namespace: obj.GetNamespace(),
Labels: p.sysCM.Labels,
Annotations: p.sysCM.Annotations,
},
Data: p.sysCM.Data,
BinaryData: p.sysCM.BinaryData,
Expand All @@ -135,32 +135,14 @@ func PropagateTrustBundles(ctx context.Context, k8s kubernetes.Interface, trustB
if p.userCm == nil {
// Update owner references
expected.OwnerReferences = withOwnerReferences(obj, gvk, []metav1.OwnerReference{})
err := createConfigMap(ctx, k8s, expected)
if err != nil && !apierrors.IsAlreadyExists(err) {
if err := createConfigMap(ctx, k8s, expected); err != nil {
return err
}
if apierrors.IsAlreadyExists(err) {
// Update existing ConfigMap
cm, getErr := k8s.CoreV1().
ConfigMaps(obj.GetNamespace()).
Get(ctx, userCMName(p.sysCM.Name), metav1.GetOptions{})
if getErr != nil {
return err // return original error
}

cm.ObjectMeta.DeepCopyInto(&expected.ObjectMeta)
expected.OwnerReferences = withOwnerReferences(obj, gvk, cm.OwnerReferences)

if !equality.Semantic.DeepDerivative(expected, cm) {
if err := updateConfigMap(ctx, k8s, expected); err != nil {
return err
}
}
}
continue
}

p.userCm.ObjectMeta.DeepCopyInto(&expected.ObjectMeta)
expected.Generation = p.userCm.Generation
expected.ResourceVersion = p.userCm.ResourceVersion
// Update owner references
expected.OwnerReferences = withOwnerReferences(obj, gvk, p.userCm.OwnerReferences)

Expand Down Expand Up @@ -277,26 +259,15 @@ func withOwnerReferences(sb kmeta.Accessor, gvk schema.GroupVersionKind, referen
}

func deleteConfigMap(ctx context.Context, k8s kubernetes.Interface, sb kmeta.Accessor, cm *corev1.ConfigMap) error {
expectedOr := metav1.OwnerReference{
APIVersion: sb.GroupVersionKind().GroupVersion().String(),
Kind: sb.GroupVersionKind().Kind,
Name: sb.GetName(),
}
// Only delete the ConfigMap if the object owns it
for _, or := range cm.OwnerReferences {
if equality.Semantic.DeepDerivative(expectedOr, or) {
err := k8s.CoreV1().ConfigMaps(sb.GetNamespace()).Delete(ctx, cm.Name, metav1.DeleteOptions{
TypeMeta: metav1.TypeMeta{},
Preconditions: &metav1.Preconditions{
UID: &cm.UID,
},
})
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to delete ConfigMap %s/%s: %w", cm.Namespace, cm.Name, err)
}

return nil
}
err := k8s.CoreV1().ConfigMaps(sb.GetNamespace()).Delete(ctx, cm.Name, metav1.DeleteOptions{
TypeMeta: metav1.TypeMeta{},
Preconditions: &metav1.Preconditions{
UID: &cm.UID,
ResourceVersion: &cm.ResourceVersion,
},
})
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to delete ConfigMap %s/%s: %w", cm.Namespace, cm.Name, err)
}

return nil
Expand Down
36 changes: 22 additions & 14 deletions pkg/graph/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,19 +118,6 @@ func (g *Graph) AddSource(source duckv1.Source) {
v.AddEdge(to, dest, CloudEventOverridesTransform{Overrides: source.Spec.CloudEventOverrides}, true)
}

func (g *Graph) getOrCreateVertex(dest *duckv1.Destination) *Vertex {
v, ok := g.vertices[makeComparableDestination(dest)]
if !ok {
v = &Vertex{
self: dest,
parent: g,
}
g.vertices[makeComparableDestination(dest)] = v
}

return v
}

func (g *Graph) AddTrigger(trigger eventingv1.Trigger) error {
brokerRef := &duckv1.KReference{
Name: trigger.Spec.Broker,
Expand All @@ -155,7 +142,7 @@ func (g *Graph) AddTrigger(trigger eventingv1.Trigger) error {
to := g.getOrCreateVertex(&trigger.Spec.Subscriber)

//TODO: the transform function should be set according to the trigger filter - there are multiple open issues to address this later
broker.AddEdge(to, triggerDest, NoTransform{}, false)
broker.AddEdge(to, triggerDest, getTransformForTrigger(trigger), false)

if trigger.Spec.Delivery == nil || trigger.Spec.Delivery.DeadLetterSink == nil {
return nil
Expand Down Expand Up @@ -209,3 +196,24 @@ func (g *Graph) AddSubscription(subscription messagingv1.Subscription) error {
return nil

}

func getTransformForTrigger(trigger eventingv1.Trigger) Transform {
if len(trigger.Spec.Filters) == 0 && trigger.Spec.Filter != nil {
return &AttributesFilterTransform{Filter: trigger.Spec.Filter}
}

return NoTransform{}
}

func (g *Graph) getOrCreateVertex(dest *duckv1.Destination) *Vertex {
v, ok := g.vertices[makeComparableDestination(dest)]
if !ok {
v = &Vertex{
self: dest,
parent: g,
}
g.vertices[makeComparableDestination(dest)] = v
}

return v
}
Loading

0 comments on commit 48de042

Please sign in to comment.