Skip to content

Commit

Permalink
List applying policies in Parallel (#8083)
Browse files Browse the repository at this point in the history
* lifecycle changes for evebtpolicies in parallel

Signed-off-by: rahulii <r.sawra@gmail.com>

* controller changes for eventpolicies and add tests

Signed-off-by: rahulii <r.sawra@gmail.com>

* fix review comments

Signed-off-by: rahulii <r.sawra@gmail.com>

* fix to review comments

Signed-off-by: rahulii <r.sawra@gmail.com>

* add featurestore to watch configmaps

Signed-off-by: rahulii <r.sawra@gmail.com>

* add featurestore to reconciler impl

Signed-off-by: rahulii <r.sawra@gmail.com>

---------

Signed-off-by: rahulii <r.sawra@gmail.com>
  • Loading branch information
rahulii committed Jul 12, 2024
1 parent a61107c commit e73f2c9
Show file tree
Hide file tree
Showing 7 changed files with 368 additions and 73 deletions.
26 changes: 25 additions & 1 deletion pkg/apis/flows/v1/parallel_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
pkgduckv1 "knative.dev/pkg/apis/duck/v1"
)

var pCondSet = apis.NewLivingConditionSet(ParallelConditionReady, ParallelConditionChannelsReady, ParallelConditionSubscriptionsReady, ParallelConditionAddressable)
var pCondSet = apis.NewLivingConditionSet(ParallelConditionReady, ParallelConditionChannelsReady, ParallelConditionSubscriptionsReady, ParallelConditionAddressable, ParallelConditionEventPoliciesReady)

const (
// ParallelConditionReady has status True when all subconditions below have been set to True.
Expand All @@ -42,6 +42,10 @@ const (
// ParallelConditionAddressable has status true when this Parallel meets
// the Addressable contract and has a non-empty hostname.
ParallelConditionAddressable apis.ConditionType = "Addressable"

// ParallelConditionEventPoliciesReady has status True when applying EventPolicies for this
// Parallel are ready or if there are no EventPolicies.
ParallelConditionEventPoliciesReady apis.ConditionType = "EventPoliciesReady"
)

// GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface.
Expand Down Expand Up @@ -217,3 +221,23 @@ func (ps *ParallelStatus) setAddress(address *pkgduckv1.Addressable) {
pCondSet.Manage(ps).MarkTrue(ParallelConditionAddressable)
}
}

// MarkEventPoliciesFailed marks the ParallelConditionEventPoliciesReady as False with the given reason and message.
func (ps *ParallelStatus) MarkEventPoliciesFailed(reason, messageFormat string, messageA ...interface{}) {
pCondSet.Manage(ps).MarkFalse(ParallelConditionEventPoliciesReady, reason, messageFormat, messageA...)
}

// MarkEventPoliciesUnknown marks the ParallelConditionEventPoliciesReady as Unknown with the given reason and message.
func (ps *ParallelStatus) MarkEventPoliciesUnknown(reason, messageFormat string, messageA ...interface{}) {
pCondSet.Manage(ps).MarkUnknown(ParallelConditionEventPoliciesReady, reason, messageFormat, messageA...)
}

// MarkEventPoliciesTrue marks the ParallelConditionEventPoliciesReady as True.
func (ps *ParallelStatus) MarkEventPoliciesTrue() {
pCondSet.Manage(ps).MarkTrue(ParallelConditionEventPoliciesReady)
}

// MarkEventPoliciesTrueWithReason marks the ParallelConditionEventPoliciesReady as True with the given reason and message.
func (ps *ParallelStatus) MarkEventPoliciesTrueWithReason(reason, messageFormat string, messageA ...interface{}) {
pCondSet.Manage(ps).MarkTrueWithReason(ParallelConditionEventPoliciesReady, reason, messageFormat, messageA...)
}
166 changes: 100 additions & 66 deletions pkg/apis/flows/v1/parallel_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ func TestParallelInitializeConditions(t *testing.T) {
}, {
Type: ParallelConditionChannelsReady,
Status: corev1.ConditionUnknown,
}, {
Type: ParallelConditionEventPoliciesReady,
Status: corev1.ConditionUnknown,
}, {
Type: ParallelConditionReady,
Status: corev1.ConditionUnknown,
Expand Down Expand Up @@ -117,6 +120,9 @@ func TestParallelInitializeConditions(t *testing.T) {
}, {
Type: ParallelConditionChannelsReady,
Status: corev1.ConditionFalse,
}, {
Type: ParallelConditionEventPoliciesReady,
Status: corev1.ConditionUnknown,
}, {
Type: ParallelConditionReady,
Status: corev1.ConditionUnknown,
Expand Down Expand Up @@ -144,6 +150,9 @@ func TestParallelInitializeConditions(t *testing.T) {
}, {
Type: ParallelConditionChannelsReady,
Status: corev1.ConditionUnknown,
}, {
Type: ParallelConditionEventPoliciesReady,
Status: corev1.ConditionUnknown,
}, {
Type: ParallelConditionReady,
Status: corev1.ConditionUnknown,
Expand Down Expand Up @@ -435,89 +444,114 @@ func TestParallelPropagateSubscriptionStatusUpdated(t *testing.T) {

func TestParallelReady(t *testing.T) {
tests := []struct {
name string
fsubs []*messagingv1.Subscription
subs []*messagingv1.Subscription
ichannel *eventingduckv1.Channelable
channels []*eventingduckv1.Channelable
want bool
name string
fsubs []*messagingv1.Subscription
subs []*messagingv1.Subscription
ichannel *eventingduckv1.Channelable
channels []*eventingduckv1.Channelable
eventPoliciesReady bool
want bool
}{{
name: "ingress false, empty",
fsubs: []*messagingv1.Subscription{},
subs: []*messagingv1.Subscription{},
ichannel: getChannelable(false),
channels: []*eventingduckv1.Channelable{},
want: false,
name: "ingress false, empty",
fsubs: []*messagingv1.Subscription{},
subs: []*messagingv1.Subscription{},
ichannel: getChannelable(false),
channels: []*eventingduckv1.Channelable{},
eventPoliciesReady: true,
want: false,
}, {
name: "ingress true, empty",
fsubs: []*messagingv1.Subscription{},
subs: []*messagingv1.Subscription{},
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{},
want: false,
name: "ingress true, empty",
fsubs: []*messagingv1.Subscription{},
subs: []*messagingv1.Subscription{},
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{},
eventPoliciesReady: true,
want: false,
}, {
name: "ingress true, one channelable not ready, one subscription ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(false)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
want: false,
name: "ingress true, one channelable not ready, one subscription ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(false)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
eventPoliciesReady: true,
want: false,
}, {
name: "ingress true, one channelable ready, one subscription not ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", false)},
subs: []*messagingv1.Subscription{getSubscription("sub0", false)},
want: false,
name: "ingress true, one channelable ready, one subscription not ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", false)},
subs: []*messagingv1.Subscription{getSubscription("sub0", false)},
eventPoliciesReady: true,
want: false,
}, {
name: "ingress false, one channelable ready, one subscription ready",
ichannel: getChannelable(false),
channels: []*eventingduckv1.Channelable{getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
want: false,
name: "ingress false, one channelable ready, one subscription ready",
ichannel: getChannelable(false),
channels: []*eventingduckv1.Channelable{getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
eventPoliciesReady: true,
want: false,
}, {
name: "ingress true, one channelable ready, one subscription ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
want: true,
name: "ingress true, one channelable ready, one subscription ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
eventPoliciesReady: true,
want: true,
}, {
name: "ingress true, one channelable ready, one not, two subscriptions ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(false)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
want: false,
name: "ingress true, one channelable ready, one not, two subscriptions ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(false)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
eventPoliciesReady: true,
want: false,
}, {
name: "ingress true, two channelables ready, one subscription ready, one not",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", false)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", false)},
want: false,
name: "ingress true, two channelables ready, one subscription ready, one not",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", false)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", false)},
eventPoliciesReady: true,
want: false,
}, {
name: "ingress false, two channelables ready, two subscriptions ready",
ichannel: getChannelable(false),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
want: false,
name: "ingress false, two channelables ready, two subscriptions ready",
ichannel: getChannelable(false),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
eventPoliciesReady: true,
want: false,
}, {
name: "ingress true, two channelables ready, two subscriptions ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
want: true,
name: "ingress true, two channelables ready, two subscriptions ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
eventPoliciesReady: true,
want: true,
}, {
name: "ingress true, two channelables ready, two subscriptions ready, event policies not ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
eventPoliciesReady: false,
want: false,
}}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ps := ParallelStatus{}
ps.PropagateChannelStatuses(test.ichannel, test.channels)
ps.PropagateSubscriptionStatuses(test.fsubs, test.subs)

if test.eventPoliciesReady {
ps.MarkEventPoliciesTrue()
} else {
ps.MarkEventPoliciesFailed("", "")
}
got := ps.IsReady()
want := test.want
if want != got {
Expand Down
30 changes: 29 additions & 1 deletion pkg/reconciler/parallel/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@ import (
"context"

"k8s.io/client-go/tools/cache"
"knative.dev/eventing/pkg/apis/feature"
v1 "knative.dev/eventing/pkg/apis/flows/v1"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/duck"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/injection/clients/dynamicclient"
"knative.dev/pkg/logging"

eventingclient "knative.dev/eventing/pkg/client/injection/client"
"knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable"
"knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy"
"knative.dev/eventing/pkg/client/injection/informers/flows/v1/parallel"
"knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription"
parallelreconciler "knative.dev/eventing/pkg/client/injection/reconciler/flows/v1/parallel"
Expand All @@ -42,14 +46,29 @@ func NewController(

parallelInformer := parallel.Get(ctx)
subscriptionInformer := subscription.Get(ctx)
eventPolicyInformer := eventpolicy.Get(ctx)

r := &Reconciler{
parallelLister: parallelInformer.Lister(),
subscriptionLister: subscriptionInformer.Lister(),
dynamicClientSet: dynamicclient.Get(ctx),
eventingClientSet: eventingclient.Get(ctx),
eventPolicyLister: eventPolicyInformer.Lister(),
}
impl := parallelreconciler.NewImpl(ctx, r)

var globalResync func()
featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) {
if globalResync != nil {
globalResync()
}
})
featureStore.WatchConfigs(cmw)

impl := parallelreconciler.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options {
return controller.Options{
ConfigStore: featureStore,
}
})

r.channelableTracker = duck.NewListableTrackerFromTracker(ctx, channelable.Get, impl.Tracker)
parallelInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))
Expand All @@ -61,5 +80,14 @@ func NewController(
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})

parallelGK := v1.Kind("Parallel")
// Enqueue the Parallel, if we have an EventPolicy which was referencing
// or got updated and now is referencing the Parallel
eventPolicyInformer.Informer().AddEventHandler(auth.EventPolicyEventHandler(
parallelInformer.Informer().GetIndexer(),
parallelGK,
impl.EnqueueKey,
))

return impl
}
11 changes: 10 additions & 1 deletion pkg/reconciler/parallel/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,28 @@ package parallel
import (
"testing"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/pkg/configmap"
. "knative.dev/pkg/reconciler/testing"

// Fake injection informers
"knative.dev/eventing/pkg/apis/feature"
_ "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/flows/v1/parallel/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription/fake"
)

func TestNew(t *testing.T) {
ctx, _ := SetupFakeContext(t)

c := NewController(ctx, configmap.NewStaticWatcher())
c := NewController(ctx, configmap.NewStaticWatcher(&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: feature.FlagsConfigName,
Namespace: "knative-eventing",
},
}))

if c == nil {
t.Fatal("Expected NewController to return a non-nil value")
Expand Down
12 changes: 12 additions & 0 deletions pkg/reconciler/parallel/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,13 @@ import (
pkgreconciler "knative.dev/pkg/reconciler"

duckv1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/apis/feature"
v1 "knative.dev/eventing/pkg/apis/flows/v1"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/eventing/pkg/auth"
clientset "knative.dev/eventing/pkg/client/clientset/versioned"
parallelreconciler "knative.dev/eventing/pkg/client/injection/reconciler/flows/v1/parallel"
eventingv1alpha1listers "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1"
listers "knative.dev/eventing/pkg/client/listers/flows/v1"
messaginglisters "knative.dev/eventing/pkg/client/listers/messaging/v1"
ducklib "knative.dev/eventing/pkg/duck"
Expand All @@ -58,6 +61,8 @@ type Reconciler struct {

// dynamicClientSet allows us to configure pluggable Build objects
dynamicClientSet dynamic.Interface

eventPolicyLister eventingv1alpha1listers.EventPolicyLister
}

// Check that our Reconciler implements parallelreconciler.Interface
Expand All @@ -71,6 +76,8 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, p *v1.Parallel) pkgrecon
// 2.2 create a Subscription to the filter Channel, subscribe the subscriber and send reply to
// either the branch Reply. If not present, send reply to the global Reply. If not present, do not send reply.
// 3. Rinse and repeat step #2 above for each branch in the list
featureFlags := feature.FromContext(ctx)

if p.Status.BranchStatuses == nil {
p.Status.BranchStatuses = make([]v1.ParallelBranchStatus, 0)
}
Expand Down Expand Up @@ -137,6 +144,11 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, p *v1.Parallel) pkgrecon
return fmt.Errorf("error removing unwanted Subscriptions: %w", err)
}

err := auth.UpdateStatusWithEventPolicies(featureFlags, &p.Status.AppliedEventPoliciesStatus, &p.Status, r.eventPolicyLister, v1.SchemeGroupVersion.WithKind("Parallel"), p.ObjectMeta)
if err != nil {
return fmt.Errorf("could not update parallel status with EventPolicies: %v", err)
}

return nil
}

Expand Down
Loading

0 comments on commit e73f2c9

Please sign in to comment.