Skip to content

Commit

Permalink
test generic broadcast
Browse files Browse the repository at this point in the history
Signed-off-by: Alexy Mantha <alexy@mantha.dev>
  • Loading branch information
alexymantha committed Apr 2, 2024
1 parent e1e467d commit 1503df2
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 3 deletions.
5 changes: 4 additions & 1 deletion server/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/argoproj/argo-cd/v2/server/rbacpolicy"
"github.com/argoproj/argo-cd/v2/util/argo"
argoutil "github.com/argoproj/argo-cd/v2/util/argo"
"github.com/argoproj/argo-cd/v2/util/broadcast"
"github.com/argoproj/argo-cd/v2/util/collections"
"github.com/argoproj/argo-cd/v2/util/db"
"github.com/argoproj/argo-cd/v2/util/env"
Expand Down Expand Up @@ -114,7 +115,9 @@ func NewServer(
enabledNamespaces []string,
) (application.ApplicationServiceServer, AppResourceTreeFn) {
if appBroadcaster == nil {
appBroadcaster = &broadcasterHandler{}
appBroadcaster = broadcast.NewHandler(func(app *appv1.Application, eventType watch.EventType) *appv1.ApplicationWatchEvent {
return &appv1.ApplicationWatchEvent{Application: *app, Type: eventType}
})
}
_, err := appInformer.AddEventHandler(appBroadcaster)
if err != nil {
Expand Down
9 changes: 7 additions & 2 deletions server/applicationset/applicationset.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1"
"github.com/argoproj/argo-cd/v2/server/rbacpolicy"
"github.com/argoproj/argo-cd/v2/util/argo"
"github.com/argoproj/argo-cd/v2/util/broadcast"
"github.com/argoproj/argo-cd/v2/util/collections"
"github.com/argoproj/argo-cd/v2/util/db"
"github.com/argoproj/argo-cd/v2/util/env"
Expand Down Expand Up @@ -76,7 +77,9 @@ func NewServer(
enabledNamespaces []string,
) applicationset.ApplicationSetServiceServer {
if appsetBroadcaster == nil {
appsetBroadcaster = &broadcasterHandler{}
appsetBroadcaster = broadcast.NewHandler(func(appset *v1alpha1.ApplicationSet, eventType watch.EventType) *v1alpha1.ApplicationSetWatchEvent {
return &v1alpha1.ApplicationSetWatchEvent{ApplicationSet: *appset, Type: eventType}
})
}
_, err := appsetInformer.AddEventHandler(appsetBroadcaster)
if err != nil {
Expand Down Expand Up @@ -247,7 +250,9 @@ func (s *Server) Watch(q *applicationset.ApplicationSetWatchQuery, ws applicatio
}

func (s *Server) isApplicationSetPermitted(selector labels.Selector, minVersion int, claims any, appsetName, appsetNs string, projects map[string]bool, a v1alpha1.ApplicationSet) bool {
logCtx := log.WithField("applicationset", appsetName)
if len(projects) > 0 && !projects[a.Spec.Template.Spec.GetProject()] {
logCtx.Debugf("Project %s is not permitted.", a.Spec.Template.Spec.GetProject())
return false
}

Expand All @@ -264,7 +269,7 @@ func (s *Server) isApplicationSetPermitted(selector labels.Selector, minVersion
}

if !s.enf.Enforce(claims, rbacpolicy.ResourceApplicationSets, rbacpolicy.ActionGet, a.RBACName(s.ns)) {
// do not emit appsets user does not have accessing
// do not emit appsets user does not have access
return false
}

Expand Down
101 changes: 101 additions & 0 deletions util/broadcast/broadcaster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package broadcast

import (
"sync"

"k8s.io/apimachinery/pkg/watch"
)

type EventFunc[T any, E any] func(content *T, eventType watch.EventType) *E

type subscriber[T any] struct {
ch chan *T
filters []func(*T) bool
}

func (s *subscriber[T]) matches(event *T) bool {
for i := range s.filters {
if !s.filters[i](event) {
return false
}
}
return true
}

// Broadcaster is an interface for broadcasting application informer watch events to multiple subscribers.
type Broadcaster[T any] interface {
Subscribe(ch chan *T, filters ...func(event *T) bool) func()
OnAdd(interface{})
OnUpdate(interface{}, interface{})
OnDelete(interface{})
}

type broadcasterHandler[T any, E any] struct {
lock sync.Mutex
subscribers []*subscriber[E]
eventFunc EventFunc[T, E]
}

func (b *broadcasterHandler[T, E]) notify(event *E) {
// Make a local copy of b.subscribers, then send channel events outside the lock,
// to avoid data race on b.subscribers changes
subscribers := []*subscriber[E]{}
b.lock.Lock()
subscribers = append(subscribers, b.subscribers...)
b.lock.Unlock()

for _, s := range subscribers {
if s.matches(event) {
select {
case s.ch <- event:
default:
// drop event if cannot send right away
// log.WithField("applicationset", event.ApplicationSet.Name).Warn("unable to send event notification")
}
}
}
}

// Subscribe forward application informer watch events to the provided channel.
// The watch events are dropped if no receives are reading events from the channel so the channel must have
// buffer if dropping events is not acceptable.
func (b *broadcasterHandler[T, E]) Subscribe(ch chan *E, filters ...func(event *E) bool) func() {
b.lock.Lock()
defer b.lock.Unlock()
subscriber := &subscriber[E]{ch, filters}
b.subscribers = append(b.subscribers, subscriber)
return func() {
b.lock.Lock()
defer b.lock.Unlock()
for i := range b.subscribers {
if b.subscribers[i] == subscriber {
b.subscribers = append(b.subscribers[:i], b.subscribers[i+1:]...)
break
}
}
}
}

func (b *broadcasterHandler[T, E]) OnAdd(obj interface{}) {
if app, ok := obj.(*T); ok {
b.notify(b.eventFunc(app, watch.Added))
}
}

func (b *broadcasterHandler[T, E]) OnUpdate(_, newObj interface{}) {
if app, ok := newObj.(*T); ok {
b.notify(b.eventFunc(app, watch.Modified))
}
}

func (b *broadcasterHandler[T, E]) OnDelete(obj interface{}) {
if app, ok := obj.(*T); ok {
b.notify(b.eventFunc(app, watch.Deleted))
}
}

func NewHandler[T any, E any](eventFunc EventFunc[T, E]) *broadcasterHandler[T, E] {
return &broadcasterHandler[T, E]{
eventFunc: eventFunc,
}
}

0 comments on commit 1503df2

Please sign in to comment.