Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

List applying policies in Parallel #8083

Merged
merged 6 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion pkg/apis/flows/v1/parallel_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
pkgduckv1 "knative.dev/pkg/apis/duck/v1"
)

var pCondSet = apis.NewLivingConditionSet(ParallelConditionReady, ParallelConditionChannelsReady, ParallelConditionSubscriptionsReady, ParallelConditionAddressable)
var pCondSet = apis.NewLivingConditionSet(ParallelConditionReady, ParallelConditionChannelsReady, ParallelConditionSubscriptionsReady, ParallelConditionAddressable, ParallelConditionEventPoliciesReady)

const (
// ParallelConditionReady has status True when all subconditions below have been set to True.
Expand All @@ -42,6 +42,10 @@ const (
// ParallelConditionAddressable has status true when this Parallel meets
// the Addressable contract and has a non-empty hostname.
ParallelConditionAddressable apis.ConditionType = "Addressable"

// ParallelConditionEventPoliciesReady has status True when applying EventPolicies for this
Leo6Leo marked this conversation as resolved.
Show resolved Hide resolved
// Parallel are ready or if there are no EventPolicies.
ParallelConditionEventPoliciesReady apis.ConditionType = "EventPoliciesReady"
)

// GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface.
Expand Down Expand Up @@ -217,3 +221,23 @@ func (ps *ParallelStatus) setAddress(address *pkgduckv1.Addressable) {
pCondSet.Manage(ps).MarkTrue(ParallelConditionAddressable)
}
}

// MarkEventPoliciesFailed marks the ParallelConditionEventPoliciesReady as False with the given reason and message.
func (ps *ParallelStatus) MarkEventPoliciesFailed(reason, messageFormat string, messageA ...interface{}) {
pCondSet.Manage(ps).MarkFalse(ParallelConditionEventPoliciesReady, reason, messageFormat, messageA...)
}

// MarkEventPoliciesUnknown marks the ParallelConditionEventPoliciesReady as Unknown with the given reason and message.
func (ps *ParallelStatus) MarkEventPoliciesUnknown(reason, messageFormat string, messageA ...interface{}) {
pCondSet.Manage(ps).MarkUnknown(ParallelConditionEventPoliciesReady, reason, messageFormat, messageA...)
}

// MarkEventPoliciesTrue marks the ParallelConditionEventPoliciesReady as True.
func (ps *ParallelStatus) MarkEventPoliciesTrue() {
pCondSet.Manage(ps).MarkTrue(ParallelConditionEventPoliciesReady)
}

// MarkEventPoliciesTrueWithReason marks the ParallelConditionEventPoliciesReady as True with the given reason and message.
func (ps *ParallelStatus) MarkEventPoliciesTrueWithReason(reason, messageFormat string, messageA ...interface{}) {
pCondSet.Manage(ps).MarkTrueWithReason(ParallelConditionEventPoliciesReady, reason, messageFormat, messageA...)
}
166 changes: 100 additions & 66 deletions pkg/apis/flows/v1/parallel_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ func TestParallelInitializeConditions(t *testing.T) {
}, {
Type: ParallelConditionChannelsReady,
Status: corev1.ConditionUnknown,
}, {
Type: ParallelConditionEventPoliciesReady,
Status: corev1.ConditionUnknown,
}, {
Type: ParallelConditionReady,
Status: corev1.ConditionUnknown,
Expand Down Expand Up @@ -117,6 +120,9 @@ func TestParallelInitializeConditions(t *testing.T) {
}, {
Type: ParallelConditionChannelsReady,
Status: corev1.ConditionFalse,
}, {
Type: ParallelConditionEventPoliciesReady,
Status: corev1.ConditionUnknown,
}, {
Type: ParallelConditionReady,
Status: corev1.ConditionUnknown,
Expand Down Expand Up @@ -144,6 +150,9 @@ func TestParallelInitializeConditions(t *testing.T) {
}, {
Type: ParallelConditionChannelsReady,
Status: corev1.ConditionUnknown,
}, {
Type: ParallelConditionEventPoliciesReady,
Status: corev1.ConditionUnknown,
}, {
Type: ParallelConditionReady,
Status: corev1.ConditionUnknown,
Expand Down Expand Up @@ -435,89 +444,114 @@ func TestParallelPropagateSubscriptionStatusUpdated(t *testing.T) {

func TestParallelReady(t *testing.T) {
tests := []struct {
name string
fsubs []*messagingv1.Subscription
subs []*messagingv1.Subscription
ichannel *eventingduckv1.Channelable
channels []*eventingduckv1.Channelable
want bool
name string
fsubs []*messagingv1.Subscription
subs []*messagingv1.Subscription
ichannel *eventingduckv1.Channelable
channels []*eventingduckv1.Channelable
eventPoliciesReady bool
want bool
}{{
name: "ingress false, empty",
fsubs: []*messagingv1.Subscription{},
subs: []*messagingv1.Subscription{},
ichannel: getChannelable(false),
channels: []*eventingduckv1.Channelable{},
want: false,
name: "ingress false, empty",
fsubs: []*messagingv1.Subscription{},
subs: []*messagingv1.Subscription{},
ichannel: getChannelable(false),
channels: []*eventingduckv1.Channelable{},
eventPoliciesReady: true,
want: false,
}, {
name: "ingress true, empty",
fsubs: []*messagingv1.Subscription{},
subs: []*messagingv1.Subscription{},
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{},
want: false,
name: "ingress true, empty",
fsubs: []*messagingv1.Subscription{},
subs: []*messagingv1.Subscription{},
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{},
eventPoliciesReady: true,
want: false,
}, {
name: "ingress true, one channelable not ready, one subscription ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(false)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
want: false,
name: "ingress true, one channelable not ready, one subscription ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(false)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
eventPoliciesReady: true,
want: false,
}, {
name: "ingress true, one channelable ready, one subscription not ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", false)},
subs: []*messagingv1.Subscription{getSubscription("sub0", false)},
want: false,
name: "ingress true, one channelable ready, one subscription not ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", false)},
subs: []*messagingv1.Subscription{getSubscription("sub0", false)},
eventPoliciesReady: true,
want: false,
}, {
name: "ingress false, one channelable ready, one subscription ready",
ichannel: getChannelable(false),
channels: []*eventingduckv1.Channelable{getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
want: false,
name: "ingress false, one channelable ready, one subscription ready",
ichannel: getChannelable(false),
channels: []*eventingduckv1.Channelable{getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
eventPoliciesReady: true,
want: false,
}, {
name: "ingress true, one channelable ready, one subscription ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
want: true,
name: "ingress true, one channelable ready, one subscription ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
eventPoliciesReady: true,
want: true,
}, {
name: "ingress true, one channelable ready, one not, two subscriptions ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(false)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
want: false,
name: "ingress true, one channelable ready, one not, two subscriptions ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(false)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
eventPoliciesReady: true,
want: false,
}, {
name: "ingress true, two channelables ready, one subscription ready, one not",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", false)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", false)},
want: false,
name: "ingress true, two channelables ready, one subscription ready, one not",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", false)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", false)},
eventPoliciesReady: true,
want: false,
}, {
name: "ingress false, two channelables ready, two subscriptions ready",
ichannel: getChannelable(false),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
want: false,
name: "ingress false, two channelables ready, two subscriptions ready",
ichannel: getChannelable(false),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
eventPoliciesReady: true,
want: false,
}, {
name: "ingress true, two channelables ready, two subscriptions ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
want: true,
name: "ingress true, two channelables ready, two subscriptions ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
eventPoliciesReady: true,
want: true,
}, {
name: "ingress true, two channelables ready, two subscriptions ready, event policies not ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
eventPoliciesReady: false,
want: false,
}}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ps := ParallelStatus{}
ps.PropagateChannelStatuses(test.ichannel, test.channels)
ps.PropagateSubscriptionStatuses(test.fsubs, test.subs)

if test.eventPoliciesReady {
ps.MarkEventPoliciesTrue()
} else {
ps.MarkEventPoliciesFailed("", "")
}
got := ps.IsReady()
want := test.want
if want != got {
Expand Down
13 changes: 13 additions & 0 deletions pkg/reconciler/parallel/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ import (

"k8s.io/client-go/tools/cache"
v1 "knative.dev/eventing/pkg/apis/flows/v1"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/duck"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/injection/clients/dynamicclient"

eventingclient "knative.dev/eventing/pkg/client/injection/client"
"knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable"
"knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy"
"knative.dev/eventing/pkg/client/injection/informers/flows/v1/parallel"
"knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription"
parallelreconciler "knative.dev/eventing/pkg/client/injection/reconciler/flows/v1/parallel"
Expand All @@ -42,12 +44,14 @@ func NewController(

parallelInformer := parallel.Get(ctx)
subscriptionInformer := subscription.Get(ctx)
eventPolicyInformer := eventpolicy.Get(ctx)

r := &Reconciler{
parallelLister: parallelInformer.Lister(),
subscriptionLister: subscriptionInformer.Lister(),
dynamicClientSet: dynamicclient.Get(ctx),
eventingClientSet: eventingclient.Get(ctx),
eventPolicyLister: eventPolicyInformer.Lister(),
}
impl := parallelreconciler.NewImpl(ctx, r)

Expand All @@ -61,5 +65,14 @@ func NewController(
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})

parallelGK := v1.SchemeGroupVersion.WithKind("Parallel").GroupKind()
rahulii marked this conversation as resolved.
Show resolved Hide resolved
// Enqueue the Parallel, if we have an EventPolicy which was referencing
// or got updated and now is referencing the Parallel
eventPolicyInformer.Informer().AddEventHandler(auth.EventPolicyEventHandler(
parallelInformer.Informer().GetIndexer(),
parallelGK,
impl.EnqueueKey,
))

return impl
}
11 changes: 10 additions & 1 deletion pkg/reconciler/parallel/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,28 @@ package parallel
import (
"testing"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/pkg/configmap"
. "knative.dev/pkg/reconciler/testing"

// Fake injection informers
"knative.dev/eventing/pkg/apis/feature"
_ "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/flows/v1/parallel/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription/fake"
)

func TestNew(t *testing.T) {
ctx, _ := SetupFakeContext(t)

c := NewController(ctx, configmap.NewStaticWatcher())
c := NewController(ctx, configmap.NewStaticWatcher(&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: feature.FlagsConfigName,
Namespace: "knative-eventing",
},
}))
rahulii marked this conversation as resolved.
Show resolved Hide resolved

if c == nil {
t.Fatal("Expected NewController to return a non-nil value")
Expand Down
12 changes: 12 additions & 0 deletions pkg/reconciler/parallel/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,13 @@ import (
pkgreconciler "knative.dev/pkg/reconciler"

duckv1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/apis/feature"
v1 "knative.dev/eventing/pkg/apis/flows/v1"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/eventing/pkg/auth"
clientset "knative.dev/eventing/pkg/client/clientset/versioned"
parallelreconciler "knative.dev/eventing/pkg/client/injection/reconciler/flows/v1/parallel"
eventingv1alpha1listers "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1"
listers "knative.dev/eventing/pkg/client/listers/flows/v1"
messaginglisters "knative.dev/eventing/pkg/client/listers/messaging/v1"
ducklib "knative.dev/eventing/pkg/duck"
Expand All @@ -58,6 +61,8 @@ type Reconciler struct {

// dynamicClientSet allows us to configure pluggable Build objects
dynamicClientSet dynamic.Interface

eventPolicyLister eventingv1alpha1listers.EventPolicyLister
}

// Check that our Reconciler implements parallelreconciler.Interface
Expand All @@ -71,6 +76,8 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, p *v1.Parallel) pkgrecon
// 2.2 create a Subscription to the filter Channel, subscribe the subscriber and send reply to
// either the branch Reply. If not present, send reply to the global Reply. If not present, do not send reply.
// 3. Rinse and repeat step #2 above for each branch in the list
featureFlags := feature.FromContext(ctx)

if p.Status.BranchStatuses == nil {
p.Status.BranchStatuses = make([]v1.ParallelBranchStatus, 0)
}
Expand Down Expand Up @@ -137,6 +144,11 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, p *v1.Parallel) pkgrecon
return fmt.Errorf("error removing unwanted Subscriptions: %w", err)
}

err := auth.UpdateStatusWithEventPolicies(featureFlags, &p.Status.AppliedEventPoliciesStatus, &p.Status, r.eventPolicyLister, v1.SchemeGroupVersion.WithKind("Parallel"), p.ObjectMeta)
rahulii marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("could not update parallel status with EventPolicies: %v", err)
}

return nil
}

Expand Down
Loading
Loading