Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: eventing configuration setup #605

Merged
merged 3 commits into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions core/pkg/runtime/from_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,6 @@ func FromConfig(logger *logger.Logger, config Config) (*Runtime, error) {
return nil, err
}

connectService := &flageval.ConnectService{
Logger: logger.WithFields(
zap.String("component", "service"),
),
Metrics: recorder,
}

// build flag store
s := store.NewFlags()
sources := []string{}
Expand All @@ -105,6 +98,14 @@ func FromConfig(logger *logger.Logger, config Config) (*Runtime, error) {
}
s.FlagSources = sources

// derive evaluator
evaluator := eval.NewJSONEvaluator(logger, s)
// derive service
connectService := flageval.NewConnectService(
logger.WithFields(zap.String("component", "service")),
evaluator,
recorder)

// build sync providers
syncLogger := logger.WithFields(zap.String("component", "sync"))
iSyncs, err := syncProvidersFromConfig(syncLogger, config.SyncProviders)
Expand All @@ -114,7 +115,7 @@ func FromConfig(logger *logger.Logger, config Config) (*Runtime, error) {

return &Runtime{
Logger: logger.WithFields(zap.String("component", "runtime")),
Evaluator: eval.NewJSONEvaluator(logger, s),
Evaluator: evaluator,
Service: connectService,
ServiceConfig: service.Configuration{
Port: config.ServicePort,
Expand Down
2 changes: 1 addition & 1 deletion core/pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (r *Runtime) Start() error {
g.Go(func() error {
// Readiness probe rely on the runtime
r.ServiceConfig.ReadinessProbe = r.isReady
return r.Service.Serve(gCtx, r.Evaluator, r.ServiceConfig)
return r.Service.Serve(gCtx, r.ServiceConfig)
})
<-gCtx.Done()
return g.Wait()
Expand Down
59 changes: 34 additions & 25 deletions core/pkg/service/flag-evaluation/connect_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,35 @@ import (
const ErrorPrefix = "FlagdError:"

type ConnectService struct {
Logger *logger.Logger
Eval eval.IEvaluator
Metrics *telemetry.MetricsRecorder
logger *logger.Logger
eval eval.IEvaluator
metrics *telemetry.MetricsRecorder
eventingConfiguration *eventingConfiguration
server *http.Server
metricsServer *http.Server

server *http.Server
metricsServer *http.Server

serverMtx sync.RWMutex
metricsServerMtx sync.RWMutex
}

func (s *ConnectService) Serve(ctx context.Context, eval eval.IEvaluator, svcConf service.Configuration) error {
s.Eval = eval
s.eventingConfiguration = &eventingConfiguration{
subs: make(map[interface{}]chan service.Notification),
mu: &sync.RWMutex{},
// NewConnectService creates a ConnectService with provided parameters
func NewConnectService(
logger *logger.Logger, evaluator eval.IEvaluator, mRecorder *telemetry.MetricsRecorder,
) *ConnectService {
return &ConnectService{
logger: logger,
eval: evaluator,
metrics: mRecorder,
eventingConfiguration: &eventingConfiguration{
subs: make(map[interface{}]chan service.Notification),
mu: &sync.RWMutex{},
},
}
}

// Serve serves services with provided configuration options
func (s *ConnectService) Serve(ctx context.Context, svcConf service.Configuration) error {
g, gCtx := errgroup.WithContext(ctx)

g.Go(func() error {
Expand Down Expand Up @@ -81,6 +92,11 @@ func (s *ConnectService) Serve(ctx context.Context, eval eval.IEvaluator, svcCon
return g.Wait()
}

// Notify emits change event notifications for subscriptions
func (s *ConnectService) Notify(n service.Notification) {
s.eventingConfiguration.emitToAll(n)
}

func (s *ConnectService) setupServer(svcConf service.Configuration) (net.Listener, error) {
var lis net.Listener
var err error
Expand All @@ -95,9 +111,10 @@ func (s *ConnectService) setupServer(svcConf service.Configuration) (net.Listene
return nil, err
}
fes := NewFlagEvaluationService(
s.Logger.WithFields(zap.String("component", "flagservice")),
s.Eval,
s.Metrics,
s.logger.WithFields(zap.String("component", "flagservice")),
s.eval,
s.eventingConfiguration,
s.metrics,
)
path, handler := schemaConnectV1.NewServiceHandler(fes)
mux.Handle(path, handler)
Expand All @@ -113,8 +130,8 @@ func (s *ConnectService) setupServer(svcConf service.Configuration) (net.Listene

metricsMiddleware := metricsmw.NewHTTPMetric(metricsmw.Config{
Service: svcConf.ServiceName,
MetricRecorder: s.Metrics,
Logger: s.Logger,
MetricRecorder: s.metrics,
Logger: s.logger,
HandlerID: "",
})

Expand All @@ -135,20 +152,12 @@ func (s *ConnectService) AddMiddleware(mw middleware.IMiddleware) {
s.server.Handler = mw.Handler(s.server.Handler)
}

func (s *ConnectService) Notify(n service.Notification) {
s.eventingConfiguration.mu.RLock()
defer s.eventingConfiguration.mu.RUnlock()
for _, send := range s.eventingConfiguration.subs {
send <- n
}
}

func (s *ConnectService) startServer(svcConf service.Configuration) error {
lis, err := s.setupServer(svcConf)
if err != nil {
return err
}
s.Logger.Info(fmt.Sprintf("Flag Evaluation listening at %s", lis.Addr()))
s.logger.Info(fmt.Sprintf("Flag Evaluation listening at %s", lis.Addr()))
if svcConf.CertPath != "" && svcConf.KeyPath != "" {
if err := s.server.ServeTLS(
lis,
Expand All @@ -168,7 +177,7 @@ func (s *ConnectService) startServer(svcConf service.Configuration) error {
}

func (s *ConnectService) startMetricsServer(svcConf service.Configuration) error {
s.Logger.Info(fmt.Sprintf("metrics and probes listening at %d", svcConf.MetricsPort))
s.logger.Info(fmt.Sprintf("metrics and probes listening at %d", svcConf.MetricsPort))
s.metricsServerMtx.Lock()
s.metricsServer = &http.Server{
Addr: fmt.Sprintf(":%d", svcConf.MetricsPort),
Expand Down
52 changes: 42 additions & 10 deletions core/pkg/service/flag-evaluation/connect_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,7 @@ func TestConnectService_UnixConnection(t *testing.T) {
exp := metric.NewManualReader()
rs := resource.NewWithAttributes("testSchema")
metricRecorder := telemetry.NewOTelRecorder(exp, rs, tt.name)
svc := ConnectService{
Logger: logger.NewLogger(nil, false),
Metrics: metricRecorder,
}
svc := NewConnectService(logger.NewLogger(nil, false), eval, metricRecorder)
serveConf := iservice.Configuration{
ReadinessProbe: func() bool {
return true
Expand All @@ -97,7 +94,7 @@ func TestConnectService_UnixConnection(t *testing.T) {
defer cancel()

go func() {
err := svc.Serve(ctx, eval, serveConf)
err := svc.Serve(ctx, serveConf)
fmt.Println(err)
}()
conn, err := grpc.Dial(
Expand Down Expand Up @@ -140,10 +137,7 @@ func TestAddMiddleware(t *testing.T) {
rs := resource.NewWithAttributes("testSchema")
metricRecorder := telemetry.NewOTelRecorder(exp, rs, "my-exporter")

svc := ConnectService{
Logger: logger.NewLogger(nil, false),
Metrics: metricRecorder,
}
svc := NewConnectService(logger.NewLogger(nil, false), nil, metricRecorder)

serveConf := iservice.Configuration{
ReadinessProbe: func() bool {
Expand All @@ -156,7 +150,7 @@ func TestAddMiddleware(t *testing.T) {
defer cancel()

go func() {
err := svc.Serve(ctx, nil, serveConf)
err := svc.Serve(ctx, serveConf)
fmt.Println(err)
}()

Expand All @@ -175,3 +169,41 @@ func TestAddMiddleware(t *testing.T) {
// verify that the status we return in the mocked middleware
require.Equal(t, http.StatusOK, resp.StatusCode)
}

func TestConnectServiceNotify(t *testing.T) {
// given
ctrl := gomock.NewController(t)
eval := mock.NewMockIEvaluator(ctrl)

exp := metric.NewManualReader()
rs := resource.NewWithAttributes("testSchema")
metricRecorder := telemetry.NewOTelRecorder(exp, rs, "my-exporter")

service := NewConnectService(logger.NewLogger(nil, false), eval, metricRecorder)

sChan := make(chan iservice.Notification, 1)
eventing := service.eventingConfiguration
eventing.subs["key"] = sChan

// notification type
ofType := iservice.ConfigurationChange

// emit notification in routine
go func() {
service.Notify(iservice.Notification{
Type: ofType,
Data: map[string]interface{}{},
})
}()

// wait for notification
timeout, cancelFunc := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelFunc()

select {
case n := <-sChan:
require.Equal(t, ofType, n.Type, "expected notification type: %s, but received %s", ofType, n.Type)
case <-timeout.Done():
t.Error("timeout while waiting for notifications")
}
}
36 changes: 36 additions & 0 deletions core/pkg/service/flag-evaluation/eventing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package service

import (
"sync"

iservice "github.com/open-feature/flagd/core/pkg/service"
)

// eventingConfiguration is a wrapper for notification subscriptions
type eventingConfiguration struct {
mu *sync.RWMutex
subs map[interface{}]chan iservice.Notification
}

func (eventing *eventingConfiguration) subscribe(id interface{}, notifyChan chan iservice.Notification) {
eventing.mu.Lock()
defer eventing.mu.Unlock()

eventing.subs[id] = notifyChan
}

func (eventing *eventingConfiguration) emitToAll(n iservice.Notification) {
eventing.mu.RLock()
defer eventing.mu.RUnlock()

for _, send := range eventing.subs {
send <- n
}
}

func (eventing *eventingConfiguration) unSubscribe(id interface{}) {
eventing.mu.Lock()
defer eventing.mu.Unlock()

delete(eventing.subs, id)
}
55 changes: 55 additions & 0 deletions core/pkg/service/flag-evaluation/eventing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package service

import (
"sync"
"testing"

iservice "github.com/open-feature/flagd/core/pkg/service"
"github.com/stretchr/testify/require"
)

func TestSubscribe(t *testing.T) {
// given
eventing := &eventingConfiguration{
subs: make(map[interface{}]chan iservice.Notification),
mu: &sync.RWMutex{},
}

idA := "a"
chanA := make(chan iservice.Notification, 1)

idB := "b"
chanB := make(chan iservice.Notification, 1)

// when
eventing.subscribe(idA, chanA)
eventing.subscribe(idB, chanB)

// then
require.Equal(t, chanA, eventing.subs[idA], "incorrect subscription association")
require.Equal(t, chanB, eventing.subs[idB], "incorrect subscription association")
}

func TestUnsubscribe(t *testing.T) {
// given
eventing := &eventingConfiguration{
subs: make(map[interface{}]chan iservice.Notification),
mu: &sync.RWMutex{},
}

idA := "a"
chanA := make(chan iservice.Notification, 1)
idB := "b"
chanB := make(chan iservice.Notification, 1)

// when
eventing.subscribe(idA, chanA)
eventing.subscribe(idB, chanB)

eventing.unSubscribe(idA)

// then
require.Empty(t, eventing.subs[idA],
"expected subscription cleared, but value present: %v", eventing.subs[idA])
require.Equal(t, chanB, eventing.subs[idB], "incorrect subscription association")
}
31 changes: 9 additions & 22 deletions core/pkg/service/flag-evaluation/flag_evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package service
import (
"context"
"fmt"
"sync"
"time"

schemaV1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/schema/v1"
Expand All @@ -25,22 +24,15 @@ type FlagEvaluationService struct {
eventingConfiguration *eventingConfiguration
}

type eventingConfiguration struct {
mu *sync.RWMutex
subs map[interface{}]chan service.Notification
}

// NewFlagEvaluationService creates a FlagEvaluationService with provided parameters
func NewFlagEvaluationService(log *logger.Logger,
eval eval.IEvaluator, metricsRecorder *telemetry.MetricsRecorder,
eval eval.IEvaluator, eventingCfg *eventingConfiguration, metricsRecorder *telemetry.MetricsRecorder,
) *FlagEvaluationService {
return &FlagEvaluationService{
logger: log,
eval: eval,
metrics: metricsRecorder,
eventingConfiguration: &eventingConfiguration{
subs: make(map[interface{}]chan service.Notification),
mu: &sync.RWMutex{},
},
logger: log,
eval: eval,
metrics: metricsRecorder,
eventingConfiguration: eventingCfg,
}
}

Expand Down Expand Up @@ -106,14 +98,9 @@ func (s *FlagEvaluationService) EventStream(
stream *connect.ServerStream[schemaV1.EventStreamResponse],
) error {
requestNotificationChan := make(chan service.Notification, 1)
s.eventingConfiguration.mu.Lock()
s.eventingConfiguration.subs[req] = requestNotificationChan
s.eventingConfiguration.mu.Unlock()
defer func() {
s.eventingConfiguration.mu.Lock()
delete(s.eventingConfiguration.subs, req)
s.eventingConfiguration.mu.Unlock()
}()
s.eventingConfiguration.subscribe(req, requestNotificationChan)
defer s.eventingConfiguration.unSubscribe(req)

requestNotificationChan <- service.Notification{
Type: service.ProviderReady,
}
Expand Down
Loading