diff --git a/core/pkg/service/flag-evaluation/connect_service.go b/core/pkg/service/flag-evaluation/connect_service.go index 1121ee04e..4fbb915ed 100644 --- a/core/pkg/service/flag-evaluation/connect_service.go +++ b/core/pkg/service/flag-evaluation/connect_service.go @@ -94,11 +94,7 @@ func (s *ConnectService) Serve(ctx context.Context, svcConf service.Configuratio // Notify emits change event notifications for subscriptions func (s *ConnectService) Notify(n service.Notification) { - s.eventingConfiguration.mu.RLock() - defer s.eventingConfiguration.mu.RUnlock() - for _, send := range s.eventingConfiguration.subs { - send <- n - } + s.eventingConfiguration.emitToAll(n) } func (s *ConnectService) setupServer(svcConf service.Configuration) (net.Listener, error) { diff --git a/core/pkg/service/flag-evaluation/eventing.go b/core/pkg/service/flag-evaluation/eventing.go new file mode 100644 index 000000000..79fa89e12 --- /dev/null +++ b/core/pkg/service/flag-evaluation/eventing.go @@ -0,0 +1,36 @@ +package service + +import ( + "sync" + + iservice "github.com/open-feature/flagd/core/pkg/service" +) + +// eventingConfiguration is a wrapper for notification subscriptions +type eventingConfiguration struct { + mu *sync.RWMutex + subs map[interface{}]chan iservice.Notification +} + +func (eventing *eventingConfiguration) subscribe(id interface{}, notifyChan chan iservice.Notification) { + eventing.mu.Lock() + defer eventing.mu.Unlock() + + eventing.subs[id] = notifyChan +} + +func (eventing *eventingConfiguration) emitToAll(n iservice.Notification) { + eventing.mu.RLock() + defer eventing.mu.RUnlock() + + for _, send := range eventing.subs { + send <- n + } +} + +func (eventing *eventingConfiguration) unSubscribe(id interface{}) { + eventing.mu.Lock() + defer eventing.mu.Unlock() + + delete(eventing.subs, id) +} diff --git a/core/pkg/service/flag-evaluation/eventing_test.go b/core/pkg/service/flag-evaluation/eventing_test.go new file mode 100644 index 000000000..01a3e61d9 --- /dev/null +++ b/core/pkg/service/flag-evaluation/eventing_test.go @@ -0,0 +1,55 @@ +package service + +import ( + "sync" + "testing" + + iservice "github.com/open-feature/flagd/core/pkg/service" + "github.com/stretchr/testify/require" +) + +func TestSubscribe(t *testing.T) { + // given + eventing := &eventingConfiguration{ + subs: make(map[interface{}]chan iservice.Notification), + mu: &sync.RWMutex{}, + } + + idA := "a" + chanA := make(chan iservice.Notification, 1) + + idB := "b" + chanB := make(chan iservice.Notification, 1) + + // when + eventing.subscribe(idA, chanA) + eventing.subscribe(idB, chanB) + + // then + require.Equal(t, chanA, eventing.subs[idA], "incorrect subscription association") + require.Equal(t, chanB, eventing.subs[idB], "incorrect subscription association") +} + +func TestUnsubscribe(t *testing.T) { + // given + eventing := &eventingConfiguration{ + subs: make(map[interface{}]chan iservice.Notification), + mu: &sync.RWMutex{}, + } + + idA := "a" + chanA := make(chan iservice.Notification, 1) + idB := "b" + chanB := make(chan iservice.Notification, 1) + + // when + eventing.subscribe(idA, chanA) + eventing.subscribe(idB, chanB) + + eventing.unSubscribe(idA) + + // then + require.Empty(t, eventing.subs[idA], + "expected subscription cleared, but value present: %v", eventing.subs[idA]) + require.Equal(t, chanB, eventing.subs[idB], "incorrect subscription association") +} diff --git a/core/pkg/service/flag-evaluation/flag_evaluator.go b/core/pkg/service/flag-evaluation/flag_evaluator.go index 9502830c0..6715d70c9 100644 --- a/core/pkg/service/flag-evaluation/flag_evaluator.go +++ b/core/pkg/service/flag-evaluation/flag_evaluator.go @@ -3,7 +3,6 @@ package service import ( "context" "fmt" - "sync" "time" schemaV1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/schema/v1" @@ -25,11 +24,6 @@ type FlagEvaluationService struct { eventingConfiguration *eventingConfiguration } -type eventingConfiguration struct { - mu *sync.RWMutex - subs map[interface{}]chan service.Notification -} - // NewFlagEvaluationService creates a FlagEvaluationService with provided parameters func NewFlagEvaluationService(log *logger.Logger, eval eval.IEvaluator, eventingCfg *eventingConfiguration, metricsRecorder *telemetry.MetricsRecorder, @@ -104,14 +98,9 @@ func (s *FlagEvaluationService) EventStream( stream *connect.ServerStream[schemaV1.EventStreamResponse], ) error { requestNotificationChan := make(chan service.Notification, 1) - s.eventingConfiguration.mu.Lock() - s.eventingConfiguration.subs[req] = requestNotificationChan - s.eventingConfiguration.mu.Unlock() - defer func() { - s.eventingConfiguration.mu.Lock() - delete(s.eventingConfiguration.subs, req) - s.eventingConfiguration.mu.Unlock() - }() + s.eventingConfiguration.subscribe(req, requestNotificationChan) + defer s.eventingConfiguration.unSubscribe(req) + requestNotificationChan <- service.Notification{ Type: service.ProviderReady, }