Skip to content

Commit

Permalink
abstract out eventing logic
Browse files Browse the repository at this point in the history
Signed-off-by: Kavindu Dodanduwa <kavindudodanduwa@gmail.com>
  • Loading branch information
Kavindu-Dodan committed Apr 12, 2023
1 parent 00326c6 commit 843af97
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 19 deletions.
6 changes: 1 addition & 5 deletions core/pkg/service/flag-evaluation/connect_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,7 @@ func (s *ConnectService) Serve(ctx context.Context, svcConf service.Configuratio

// Notify emits change event notifications for subscriptions
func (s *ConnectService) Notify(n service.Notification) {
s.eventingConfiguration.mu.RLock()
defer s.eventingConfiguration.mu.RUnlock()
for _, send := range s.eventingConfiguration.subs {
send <- n
}
s.eventingConfiguration.emitToAll(n)
}

func (s *ConnectService) setupServer(svcConf service.Configuration) (net.Listener, error) {
Expand Down
36 changes: 36 additions & 0 deletions core/pkg/service/flag-evaluation/eventing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package service

import (
"sync"

iservice "github.com/open-feature/flagd/core/pkg/service"
)

// eventingConfiguration is a wrapper for notification subscriptions
type eventingConfiguration struct {
mu *sync.RWMutex
subs map[interface{}]chan iservice.Notification
}

func (eventing *eventingConfiguration) subscribe(id interface{}, notifyChan chan iservice.Notification) {
eventing.mu.Lock()
defer eventing.mu.Unlock()

eventing.subs[id] = notifyChan
}

func (eventing *eventingConfiguration) emitToAll(n iservice.Notification) {
eventing.mu.RLock()
defer eventing.mu.RUnlock()

for _, send := range eventing.subs {
send <- n
}
}

func (eventing *eventingConfiguration) unSubscribe(id interface{}) {
eventing.mu.Lock()
defer eventing.mu.Unlock()

delete(eventing.subs, id)
}
55 changes: 55 additions & 0 deletions core/pkg/service/flag-evaluation/eventing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package service

import (
"sync"
"testing"

iservice "github.com/open-feature/flagd/core/pkg/service"
"github.com/stretchr/testify/require"
)

func TestSubscribe(t *testing.T) {
// given
eventing := &eventingConfiguration{
subs: make(map[interface{}]chan iservice.Notification),
mu: &sync.RWMutex{},
}

idA := "a"
chanA := make(chan iservice.Notification, 1)

idB := "b"
chanB := make(chan iservice.Notification, 1)

// when
eventing.subscribe(idA, chanA)
eventing.subscribe(idB, chanB)

// then
require.Equal(t, chanA, eventing.subs[idA], "incorrect subscription association")
require.Equal(t, chanB, eventing.subs[idB], "incorrect subscription association")
}

func TestUnsubscribe(t *testing.T) {
// given
eventing := &eventingConfiguration{
subs: make(map[interface{}]chan iservice.Notification),
mu: &sync.RWMutex{},
}

idA := "a"
chanA := make(chan iservice.Notification, 1)
idB := "b"
chanB := make(chan iservice.Notification, 1)

// when
eventing.subscribe(idA, chanA)
eventing.subscribe(idB, chanB)

eventing.unSubscribe(idA)

// then
require.Empty(t, eventing.subs[idA],
"expected subscription cleared, but value present: %v", eventing.subs[idA])
require.Equal(t, chanB, eventing.subs[idB], "incorrect subscription association")
}
17 changes: 3 additions & 14 deletions core/pkg/service/flag-evaluation/flag_evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package service
import (
"context"
"fmt"
"sync"
"time"

schemaV1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/schema/v1"
Expand All @@ -25,11 +24,6 @@ type FlagEvaluationService struct {
eventingConfiguration *eventingConfiguration
}

type eventingConfiguration struct {
mu *sync.RWMutex
subs map[interface{}]chan service.Notification
}

// NewFlagEvaluationService creates a FlagEvaluationService with provided parameters
func NewFlagEvaluationService(log *logger.Logger,
eval eval.IEvaluator, eventingCfg *eventingConfiguration, metricsRecorder *telemetry.MetricsRecorder,
Expand Down Expand Up @@ -104,14 +98,9 @@ func (s *FlagEvaluationService) EventStream(
stream *connect.ServerStream[schemaV1.EventStreamResponse],
) error {
requestNotificationChan := make(chan service.Notification, 1)
s.eventingConfiguration.mu.Lock()
s.eventingConfiguration.subs[req] = requestNotificationChan
s.eventingConfiguration.mu.Unlock()
defer func() {
s.eventingConfiguration.mu.Lock()
delete(s.eventingConfiguration.subs, req)
s.eventingConfiguration.mu.Unlock()
}()
s.eventingConfiguration.subscribe(req, requestNotificationChan)
defer s.eventingConfiguration.unSubscribe(req)

requestNotificationChan <- service.Notification{
Type: service.ProviderReady,
}
Expand Down

0 comments on commit 843af97

Please sign in to comment.