From b911ab718cd761f03b062b53159f8d57b85f9527 Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Fri, 30 Jun 2023 08:54:25 -0700 Subject: [PATCH] 1 to n support and eventing completeness Signed-off-by: Kavindu Dodanduwa --- go.mod | 3 +- go.sum | 2 - pkg/openfeature/api.go | 80 ++- pkg/openfeature/event_executor.go | 239 ++++--- pkg/openfeature/event_executor_test.go | 839 +++++++++++++++++++++++-- pkg/openfeature/openfeature.go | 8 +- pkg/openfeature/openfeature_test.go | 89 ++- pkg/openfeature/provider.go | 5 +- pkg/openfeature/provider_mock_test.go | 6 +- pkg/openfeature/util_test.go | 13 +- 10 files changed, 1119 insertions(+), 165 deletions(-) diff --git a/go.mod b/go.mod index f8e766ba..cddf6965 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/golang/mock v1.6.0 github.com/open-feature/go-sdk-contrib/providers/flagd v0.1.7 github.com/open-feature/go-sdk-contrib/tests/flagd v1.1.0 - golang.org/x/sync v0.1.0 + golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 golang.org/x/text v0.10.0 ) @@ -47,7 +47,6 @@ require ( go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.8.0 // indirect go.uber.org/zap v1.24.0 // indirect - golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 // indirect golang.org/x/net v0.6.0 // indirect golang.org/x/sys v0.5.0 // indirect google.golang.org/protobuf v1.28.1 // indirect diff --git a/go.sum b/go.sum index 96be3bc5..dff79853 100644 --- a/go.sum +++ b/go.sum @@ -388,8 +388,6 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= -golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/pkg/openfeature/api.go b/pkg/openfeature/api.go index d58da705..0c2b4820 100644 --- a/pkg/openfeature/api.go +++ b/pkg/openfeature/api.go @@ -6,6 +6,7 @@ import ( "github.com/go-logr/logr" "github.com/open-feature/go-sdk/pkg/openfeature/internal" + "golang.org/x/exp/maps" ) // evaluationAPI wraps OpenFeature evaluation API functionalities @@ -16,7 +17,7 @@ type evaluationAPI struct { evalCtx EvaluationContext logger logr.Logger mu sync.RWMutex - eventExecutor + eventExecutor *eventExecutor } // newEvaluationAPI is a helper to generate an API. Used internally @@ -46,10 +47,10 @@ func (api *evaluationAPI) setProvider(provider FeatureProvider) error { // Initialize new default provider and shutdown the old one // Provider update must be non-blocking, hence initialization & shutdown happens concurrently oldProvider := api.defaultProvider - api.initAndShutdown(provider, oldProvider) api.defaultProvider = provider - err := api.registerDefaultProvider(provider) + api.initNewAndShutdownOld(provider, oldProvider) + err := api.eventExecutor.registerDefaultProvider(provider) if err != nil { return err } @@ -76,10 +77,11 @@ func (api *evaluationAPI) setNamedProvider(clientName string, provider FeaturePr // Initialize new default provider and shutdown the old one // Provider update must be non-blocking, hence initialization & shutdown happens concurrently - api.initAndShutdown(provider, api.namedProviders[clientName]) + oldProvider := api.namedProviders[clientName] api.namedProviders[clientName] = provider - err := api.registerNamedEventingProvider(clientName, provider) + api.initNewAndShutdownOld(provider, oldProvider) + err := api.eventExecutor.registerNamedEventingProvider(clientName, provider) if err != nil { return err } @@ -125,6 +127,9 @@ func (api *evaluationAPI) addHooks(hooks ...Hook) { } func (api *evaluationAPI) shutdown() { + api.mu.Lock() + defer api.mu.Unlock() + v, ok := api.defaultProvider.(StateHandler) if ok { v.Shutdown() @@ -161,21 +166,56 @@ func (api *evaluationAPI) forTransaction(clientName string) (FeatureProvider, [] return provider, api.hks, api.evalCtx } -// initAndShutdown is a helper to initialise new FeatureProvider and shutdown old FeatureProvider. -// Operation happens concurrently. -func (api *evaluationAPI) initAndShutdown(newProvider FeatureProvider, oldProvider FeatureProvider) { - go func() { - v, ok := newProvider.(StateHandler) - if ok { - v.Init(api.evalCtx) - } - - // oldProvider can be nil - if oldProvider != nil { - v, ok = oldProvider.(StateHandler) - if ok { - v.Shutdown() +// initNewAndShutdownOld is a helper to initialise new FeatureProvider and shutdown the old FeatureProvider. +// Operations happen concurrently. +func (api *evaluationAPI) initNewAndShutdownOld(newProvider FeatureProvider, oldProvider FeatureProvider) { + v, ok := newProvider.(StateHandler) + if ok && v.Status() == NotReadyState { + go func(provider FeatureProvider, stateHandler StateHandler, evalCtx EvaluationContext, eventChan chan eventPayload) { + err := stateHandler.Init(evalCtx) + if err != nil { + eventChan <- eventPayload{ + Event{ + ProviderName: provider.Metadata().Name, + EventType: ProviderError, + ProviderEventDetails: ProviderEventDetails{}, + }, provider, + } + } else { + eventChan <- eventPayload{ + Event{ + ProviderName: provider.Metadata().Name, + EventType: ProviderReady, + ProviderEventDetails: ProviderEventDetails{}, + }, provider, + } } + }(newProvider, v, api.evalCtx, api.eventExecutor.eventChan) + } + + v, ok = oldProvider.(StateHandler) + + // oldProvider can be nil or without state handling capability + if oldProvider == nil || !ok { + return + } + + // check for multiple bindings + if oldProvider == api.defaultProvider || contains(oldProvider, maps.Values(api.namedProviders)) { + return + } + + go func(forShutdown StateHandler) { + forShutdown.Shutdown() + }(v) +} + +func contains(provider FeatureProvider, in []FeatureProvider) bool { + for _, p := range in { + if provider == p { + return true } - }() + } + + return false } diff --git a/pkg/openfeature/event_executor.go b/pkg/openfeature/event_executor.go index 66ae3421..9be015df 100644 --- a/pkg/openfeature/event_executor.go +++ b/pkg/openfeature/event_executor.go @@ -6,26 +6,39 @@ import ( "time" "github.com/go-logr/logr" + "golang.org/x/exp/maps" ) // event executor is a registry to connect API and Client event handlers to Providers +// eventExecutor handles events emitted from FeatureProvider. It follows a pub-sub model based on channels. +// Emitted events are written to eventChan. This model is chosen so that events can be triggered from subscribed +// feature provider as well as from API(ex:- for initialization events). +// Usage of channels help with concurrency and adhere to the principal of sharing memory by communication. type eventExecutor struct { - defaultProviderReference *providerReference - namedProviderReference map[string]*providerReference + defaultProviderReference providerReference + namedProviderReference map[string]providerReference + activeSubscriptions []providerReference apiRegistry map[EventType][]EventCallback scopedRegistry map[string]scopedCallback logger logr.Logger + eventChan chan eventPayload + once sync.Once mu sync.Mutex } -func newEventExecutor(logger logr.Logger) eventExecutor { - return eventExecutor{ - namedProviderReference: map[string]*providerReference{}, +func newEventExecutor(logger logr.Logger) *eventExecutor { + executor := eventExecutor{ + namedProviderReference: map[string]providerReference{}, + activeSubscriptions: []providerReference{}, apiRegistry: map[EventType][]EventCallback{}, scopedRegistry: map[string]scopedCallback{}, logger: logger, + eventChan: make(chan eventPayload, 1), } + + executor.startEventListener() + return &executor } // scopedCallback is a helper struct to hold client name associated callbacks. @@ -35,6 +48,11 @@ type scopedCallback struct { callbacks map[EventType][]EventCallback } +type eventPayload struct { + event Event + handler FeatureProvider +} + func newScopedCallback(client string) scopedCallback { return scopedCallback{ scope: client, @@ -42,12 +60,11 @@ func newScopedCallback(client string) scopedCallback { } } +// providerReference is a helper struct to store FeatureProvider with EventHandler capability along with their +// shutdown semaphore type providerReference struct { - eventHandler *EventHandler - metadata Metadata - clientNameAssociation string - isDefault bool - shutdownSemaphore chan interface{} + featureProvider FeatureProvider + shutdownSemaphore chan interface{} } // updateLogger updates the executor's logger @@ -68,6 +85,8 @@ func (e *eventExecutor) registerApiHandler(t EventType, c EventCallback) { } else { e.apiRegistry[t] = append(e.apiRegistry[t], c) } + + e.emitOnRegistration(e.defaultProviderReference, t, c) } // removeApiHandler removes an API(global) level handler @@ -108,29 +127,13 @@ func (e *eventExecutor) registerClientHandler(clientName string, t EventType, c registry.callbacks[t] = append(registry.callbacks[t], c) } - // fulfil spec requirement to fire ready events if associated provider is ready - if t != ProviderReady { - return - } - - provider, ok := e.namedProviderReference[clientName] + reference, ok := e.namedProviderReference[clientName] if !ok { - return + // fallback to default + reference = e.defaultProviderReference } - s, ok := (*provider.eventHandler).(StateHandler) - if !ok { - return - } - - if s.Status() == ReadyState { - (*c)(EventDetails{ - providerName: provider.metadata.Name, - ProviderEventDetails: ProviderEventDetails{ - Message: "provider is at ready state", - }, - }) - } + e.emitOnRegistration(reference, t, c) } // removeClientHandler removes a client level handler @@ -159,30 +162,45 @@ func (e *eventExecutor) removeClientHandler(name string, t EventType, c EventCal e.scopedRegistry[name].callbacks[t] = entrySlice } -// registerDefaultProvider registers the default FeatureProvider and remove the old default provider if available -func (e *eventExecutor) registerDefaultProvider(provider FeatureProvider) error { - e.mu.Lock() - defer e.mu.Unlock() +// emitOnRegistration fulfils the spec requirement to fire ready events if associated provider is ready +func (e *eventExecutor) emitOnRegistration(providerReference providerReference, t EventType, c EventCallback) { + if t != ProviderReady { + return + } - v, ok := provider.(EventHandler) + s, ok := (providerReference.featureProvider).(StateHandler) if !ok { - return nil + // not a state handler, hence ignore state emitting + return } - oldProvider := e.defaultProviderReference + if s.Status() == ReadyState { + (*c)(EventDetails{ + providerName: (providerReference.featureProvider).Metadata().Name, + ProviderEventDetails: ProviderEventDetails{ + Message: "provider is at ready state", + }, + }) + } +} + +// registerDefaultProvider registers the default FeatureProvider and remove the old default provider if available +func (e *eventExecutor) registerDefaultProvider(provider FeatureProvider) error { + e.mu.Lock() + defer e.mu.Unlock() // register shutdown semaphore for new default provider sem := make(chan interface{}) - newProvider := &providerReference{ - eventHandler: &v, - metadata: provider.Metadata(), - isDefault: true, + newProvider := providerReference{ + featureProvider: provider, shutdownSemaphore: sem, } + oldProvider := e.defaultProviderReference e.defaultProviderReference = newProvider - return e.listenAndShutdown(newProvider, oldProvider) + + return e.startListeningAndShutdownOld(newProvider, oldProvider) } // registerNamedEventingProvider registers a named FeatureProvider and remove event listener for old named provider @@ -190,56 +208,92 @@ func (e *eventExecutor) registerNamedEventingProvider(associatedClient string, p e.mu.Lock() defer e.mu.Unlock() - v, ok := provider.(EventHandler) - if !ok { - return nil - } - - oldProvider := e.namedProviderReference[associatedClient] - // register shutdown semaphore for new named provider sem := make(chan interface{}) - newProvider := &providerReference{ - eventHandler: &v, - clientNameAssociation: associatedClient, - metadata: provider.Metadata(), - shutdownSemaphore: sem, + newProvider := providerReference{ + featureProvider: provider, + shutdownSemaphore: sem, } + oldProvider := e.namedProviderReference[associatedClient] e.namedProviderReference[associatedClient] = newProvider - return e.listenAndShutdown(newProvider, oldProvider) + + return e.startListeningAndShutdownOld(newProvider, oldProvider) } -// listenAndShutdown is a helper to start concurrent listening to new provider events and invoke shutdown hook of the -// old provider -func (e *eventExecutor) listenAndShutdown(newProvider *providerReference, oldReference *providerReference) error { - go func(newProvider *providerReference, oldReference *providerReference) { - for { - select { - case event := <-(*newProvider.eventHandler).EventChannel(): - e.triggerEvent(event, newProvider.clientNameAssociation, newProvider.isDefault) - case <-newProvider.shutdownSemaphore: +// startListeningAndShutdownOld is a helper to start concurrent listening to new provider events and invoke shutdown +// hook of the old provider if it's not bound by another subscription +func (e *eventExecutor) startListeningAndShutdownOld(newProvider providerReference, oldReference providerReference) error { + + // check if this provider already actively handled - 1:N binding capability + if !isRunning(newProvider, e.activeSubscriptions) { + e.activeSubscriptions = append(e.activeSubscriptions, newProvider) + + go func() { + v, ok := newProvider.featureProvider.(EventHandler) + if !ok { return } - } - }(newProvider, oldReference) + + // event handling of the new feature provider + for { + select { + case event := <-v.EventChannel(): + e.eventChan <- eventPayload{ + event: event, + handler: newProvider.featureProvider, + } + case <-newProvider.shutdownSemaphore: + return + } + } + }() + } // shutdown old provider handling - if oldReference == nil { + + // check if this provider is still bound - 1:N binding capability + if isBound(oldReference, e.defaultProviderReference, maps.Values(e.namedProviderReference)) { return nil } + // drop from active references + for i, r := range e.activeSubscriptions { + if r == oldReference { + e.activeSubscriptions = append(e.activeSubscriptions[:i], e.activeSubscriptions[i+1:]...) + } + } + + _, ok := oldReference.featureProvider.(EventHandler) + if !ok { + // no shutdown for non event handling provider + return nil + } + + // avoid shutdown lockouts select { case oldReference.shutdownSemaphore <- "": return nil case <-time.After(200 * time.Millisecond): - return fmt.Errorf("event handler timeout waiting for handler shutdown") + return fmt.Errorf("old event handler %s timeout waiting for handler shutdown", + oldReference.featureProvider.Metadata().Name) } } +// startEventListener trigger the event listening of this executor +func (e *eventExecutor) startEventListener() { + e.once.Do(func() { + go func() { + for payload := range e.eventChan { + e.triggerEvent(payload.event, payload.handler) + } + }() + }) +} + // triggerEvent performs the actual event handling -func (e *eventExecutor) triggerEvent(event Event, clientNameAssociation string, isDefault bool) { +func (e *eventExecutor) triggerEvent(event Event, handler FeatureProvider) { e.mu.Lock() defer e.mu.Unlock() @@ -248,20 +302,25 @@ func (e *eventExecutor) triggerEvent(event Event, clientNameAssociation string, e.executeHandler(*c, event) } - // then run Client handlers for name association + // then run client handlers + var nameAssociations []string + for name, reference := range e.namedProviderReference { + if reference.featureProvider == handler { + nameAssociations = append(nameAssociations, name) + } + } - // first direct associates - associateClientRegistry := e.scopedRegistry[clientNameAssociation] - for _, c := range associateClientRegistry.callbacks[event.EventType] { - e.executeHandler(*c, event) + for _, nameAssociation := range nameAssociations { + for _, c := range e.scopedRegistry[nameAssociation].callbacks[event.EventType] { + e.executeHandler(*c, event) + } } - if !isDefault { + if e.defaultProviderReference.featureProvider != handler { return } - // handling the default provider - invoke default provider bound handlers by filtering - + // handling the default provider - invoke default provider bound (no provider associated) handlers by filtering var defaultHandlers []EventCallback for clientName, registry := range e.scopedRegistry { @@ -294,3 +353,29 @@ func (e *eventExecutor) executeHandler(f func(details EventDetails), event Event }) }() } + +// isRunning is a helper till we bump to the latest go version with slices.contains support +func isRunning(provider providerReference, activeProviders []providerReference) bool { + for _, activeProvider := range activeProviders { + if provider.featureProvider == activeProvider.featureProvider { + return true + } + } + + return false +} + +// isRunning is a helper to check if given provider is already in use +func isBound(provider providerReference, defaultProvider providerReference, namedProviders []providerReference) bool { + if provider.featureProvider == defaultProvider.featureProvider { + return true + } + + for _, namedProvider := range namedProviders { + if provider.featureProvider == namedProvider.featureProvider { + return true + } + } + + return false +} diff --git a/pkg/openfeature/event_executor_test.go b/pkg/openfeature/event_executor_test.go index 245a8dc4..47083a9c 100644 --- a/pkg/openfeature/event_executor_test.go +++ b/pkg/openfeature/event_executor_test.go @@ -1,6 +1,7 @@ package openfeature import ( + "errors" "reflect" "testing" "time" @@ -21,27 +22,6 @@ func init() { // with a provider event details payload. func TestEventHandler_RegisterUnregisterEventProvider(t *testing.T) { - t.Run("Ignored non-eventing providers", func(t *testing.T) { - executor := newEventExecutor(logger) - err := executor.registerDefaultProvider(NoopProvider{}) - if err != nil { - t.Fatal(err) - } - - if executor.defaultProviderReference != nil { - t.Errorf("implementation should ignore non eventing provider") - } - - err = executor.registerNamedEventingProvider("name", NoopProvider{}) - if err != nil { - t.Fatal(err) - } - - if len(executor.namedProviderReference) != 0 { - t.Fatalf("implementation should ignore non eventing provider") - } - }) - t.Run("Accepts addition of eventing providers", func(t *testing.T) { eventingImpl := &ProviderEventing{} @@ -59,10 +39,6 @@ func TestEventHandler_RegisterUnregisterEventProvider(t *testing.T) { t.Fatal(err) } - if executor.defaultProviderReference == nil { - t.Errorf("implementation should register eventing provider") - } - err = executor.registerNamedEventingProvider("name", eventingProvider) if err != nil { t.Fatal(err) @@ -276,6 +252,18 @@ func TestEventHandler_clientAssociation(t *testing.T) { // Requirement 5.2.5 If a handler function terminates abnormally, other handler functions MUST run. func TestEventHandler_ErrorHandling(t *testing.T) { + eventing := &ProviderEventing{ + c: make(chan Event, 1), + } + + provider := struct { + FeatureProvider + EventHandler + }{ + NoopProvider{}, + eventing, + } + errorCallback := func(e EventDetails) { panic("callback panic") } @@ -291,25 +279,29 @@ func TestEventHandler_ErrorHandling(t *testing.T) { } executor := newEventExecutor(logger) + err := executor.registerDefaultProvider(provider) + if err != nil { + t.Fatal(err) + } // api level handlers executor.registerApiHandler(ProviderReady, &errorCallback) executor.registerApiHandler(ProviderReady, &successAPICallback) // provider association - provider := "providerA" + providerName := "providerA" // client level handlers - executor.registerClientHandler(provider, ProviderReady, &errorCallback) - executor.registerClientHandler(provider, ProviderReady, &successClientCallback) + executor.registerClientHandler(providerName, ProviderReady, &errorCallback) + executor.registerClientHandler(providerName, ProviderReady, &successClientCallback) // trigger events manually go func() { - executor.triggerEvent(Event{ - ProviderName: provider, + eventing.Invoke(Event{ + ProviderName: providerName, EventType: ProviderReady, ProviderEventDetails: ProviderEventDetails{}, - }, "", true) + }) }() select { @@ -327,41 +319,794 @@ func TestEventHandler_ErrorHandling(t *testing.T) { } } +// Requirement 5.3.1 If the provider's initialize function terminates normally, PROVIDER_READY handlers MUST run. +func TestEventHandler_InitOfProvider(t *testing.T) { + t.Run("for default provider in global handler scope", func(t *testing.T) { + defer t.Cleanup(initSingleton) + + // provider + provider := struct { + FeatureProvider + StateHandler + }{ + NoopProvider{}, + &stateHandlerForTests{ + State: NotReadyState, + }, + } + + // callback + rsp := make(chan EventDetails, 1) + callback := func(e EventDetails) { + rsp <- e + } + + AddHandler(ProviderReady, &callback) + err := SetProvider(provider) + if err != nil { + t.Fatal(err) + } + + select { + case <-rsp: + break + case <-time.After(200 * time.Millisecond): + t.Errorf("timedout waiting for ready state callback, but got none") + } + }) + + t.Run("for default provider with unassociated client handler", func(t *testing.T) { + defer t.Cleanup(initSingleton) + + // provider + provider := struct { + FeatureProvider + StateHandler + }{ + NoopProvider{}, + &stateHandlerForTests{ + State: NotReadyState, + }, + } + + // callback + rsp := make(chan EventDetails, 1) + callback := func(e EventDetails) { + rsp <- e + } + + addClientHandler("clientWithNoProviderAssociation", ProviderReady, &callback) + err := SetProvider(provider) + if err != nil { + t.Fatal(err) + } + + select { + case <-rsp: + break + case <-time.After(200 * time.Millisecond): + t.Errorf("timedout waiting for ready state callback, but got none") + } + }) + + t.Run("for named provider in client scope", func(t *testing.T) { + defer t.Cleanup(initSingleton) + + // provider + provider := struct { + FeatureProvider + StateHandler + }{ + NoopProvider{}, + &stateHandlerForTests{ + State: NotReadyState, + }, + } + + // callback + rsp := make(chan EventDetails, 1) + callback := func(e EventDetails) { + rsp <- e + } + + addClientHandler("someClient", ProviderReady, &callback) + err := SetNamedProvider("someClient", provider) + if err != nil { + t.Fatal(err) + } + + select { + case <-rsp: + break + case <-time.After(200 * time.Millisecond): + t.Errorf("timedout waiting for ready state callback, but got none") + } + }) + + t.Run("no callback for named provider with no associations", func(t *testing.T) { + defer t.Cleanup(initSingleton) + + // provider + provider := struct { + FeatureProvider + StateHandler + }{ + NoopProvider{}, + &stateHandlerForTests{ + State: NotReadyState, + }, + } + + // callback + rsp := make(chan EventDetails, 1) + callback := func(e EventDetails) { + rsp <- e + } + + addClientHandler("provider", ProviderReady, &callback) + err := SetNamedProvider("someClient", provider) + if err != nil { + t.Fatal(err) + } + + select { + case <-rsp: + t.Errorf("event must have not emitted") + case <-time.After(200 * time.Millisecond): + break + } + }) + +} + +// Requirement 5.3.2 If the provider's initialize function terminates abnormally, PROVIDER_ERROR handlers MUST run. +func TestEventHandler_InitOfProviderError(t *testing.T) { + t.Run("for default provider in global scope", func(t *testing.T) { + defer t.Cleanup(initSingleton) + + // provider + provider := struct { + FeatureProvider + StateHandler + }{ + NoopProvider{}, + &stateHandlerForTests{ + initF: func(e EvaluationContext) error { + return errors.New("initialization failed") + }, + State: NotReadyState, + }, + } + + // callback + rsp := make(chan EventDetails, 1) + callback := func(e EventDetails) { + rsp <- e + } + + AddHandler(ProviderError, &callback) + err := SetProvider(provider) + if err != nil { + t.Fatal(err) + } + + select { + case <-rsp: + break + case <-time.After(200 * time.Millisecond): + t.Errorf("timedout waiting for ready state callback, but got none") + } + }) + + t.Run("for default provider with unassociated client handler", func(t *testing.T) { + defer t.Cleanup(initSingleton) + + // provider + provider := struct { + FeatureProvider + StateHandler + }{ + NoopProvider{}, + &stateHandlerForTests{ + initF: func(e EvaluationContext) error { + return errors.New("initialization failed") + }, + State: NotReadyState, + }, + } + + // callback + rsp := make(chan EventDetails, 1) + callback := func(e EventDetails) { + rsp <- e + } + + addClientHandler("clientWithNoProviderAssociation", ProviderError, &callback) + err := SetProvider(provider) + if err != nil { + t.Fatal(err) + } + + select { + case <-rsp: + break + case <-time.After(200 * time.Millisecond): + t.Errorf("timedout waiting for ready state callback, but got none") + } + }) + + t.Run("for named provider in client scope", func(t *testing.T) { + defer t.Cleanup(initSingleton) + + // provider + provider := struct { + FeatureProvider + StateHandler + }{ + NoopProvider{}, + &stateHandlerForTests{ + initF: func(e EvaluationContext) error { + return errors.New("initialization failed") + }, + State: NotReadyState, + }, + } + + // callback + rsp := make(chan EventDetails, 1) + callback := func(e EventDetails) { + rsp <- e + } + + addClientHandler("someClient", ProviderError, &callback) + err := SetNamedProvider("someClient", provider) + if err != nil { + t.Fatal(err) + } + + select { + case <-rsp: + break + case <-time.After(200 * time.Millisecond): + t.Errorf("timedout waiting for ready state callback, but got none") + } + }) + + t.Run("no callback for named provider with no associations", func(t *testing.T) { + defer t.Cleanup(initSingleton) + + // provider + provider := struct { + FeatureProvider + StateHandler + }{ + NoopProvider{}, + &stateHandlerForTests{ + initF: func(e EvaluationContext) error { + return errors.New("initialization failed") + }, + State: NotReadyState, + }, + } + + // callback + rsp := make(chan EventDetails, 1) + callback := func(e EventDetails) { + rsp <- e + } + + addClientHandler("provider", ProviderError, &callback) + err := SetNamedProvider("someClient", provider) + if err != nil { + t.Fatal(err) + } + + select { + case <-rsp: + t.Errorf("event must have not emitted") + case <-time.After(200 * time.Millisecond): + break + } + }) + +} + // Requirement 5.3.3 PROVIDER_READY handlers attached after the provider is already in a ready state MUST run immediately. func TestEventHandler_ProviderReadiness(t *testing.T) { - readyEventingProvider := struct { + t.Run("for api level under default provider", func(t *testing.T) { + readyEventingProvider := struct { + FeatureProvider + StateHandler + EventHandler + }{ + NoopProvider{}, + &stateHandlerForTests{ + State: ReadyState, + }, + &ProviderEventing{}, + } + + executor := newEventExecutor(logger) + + err := executor.registerDefaultProvider(readyEventingProvider) + if err != nil { + t.Fatal(err) + } + + rsp := make(chan EventDetails, 1) + callback := func(e EventDetails) { + rsp <- e + } + + executor.registerApiHandler(ProviderReady, &callback) + + select { + case <-rsp: + break + case <-time.After(200 * time.Millisecond): + t.Errorf("timedout waiting for ready state callback, but got none") + } + }) + + t.Run("for name associated handler", func(t *testing.T) { + readyEventingProvider := struct { + FeatureProvider + StateHandler + EventHandler + }{ + NoopProvider{}, + &stateHandlerForTests{ + State: ReadyState, + }, + &ProviderEventing{}, + } + + executor := newEventExecutor(logger) + + clientAssociation := "clientA" + err := executor.registerNamedEventingProvider(clientAssociation, readyEventingProvider) + if err != nil { + t.Fatal(err) + } + + rsp := make(chan EventDetails, 1) + callback := func(e EventDetails) { + rsp <- e + } + + executor.registerClientHandler(clientAssociation, ProviderReady, &callback) + + select { + case <-rsp: + break + case <-time.After(200 * time.Millisecond): + t.Errorf("timedout waiting for ready state callback, but got none") + } + }) + + t.Run("for unassociated handler from default", func(t *testing.T) { + readyEventingProvider := struct { + FeatureProvider + StateHandler + EventHandler + }{ + NoopProvider{}, + &stateHandlerForTests{ + State: ReadyState, + }, + &ProviderEventing{}, + } + + executor := newEventExecutor(logger) + + err := executor.registerDefaultProvider(readyEventingProvider) + if err != nil { + t.Fatal(err) + } + + rsp := make(chan EventDetails, 1) + callback := func(e EventDetails) { + rsp <- e + } + + executor.registerClientHandler("someClient", ProviderReady, &callback) + + select { + case <-rsp: + break + case <-time.After(200 * time.Millisecond): + t.Errorf("timedout waiting for ready state callback, but got none") + } + }) + + t.Run("no event if provider is not ready", func(t *testing.T) { + notReadyEventingProvider := struct { + FeatureProvider + StateHandler + EventHandler + }{ + NoopProvider{}, + &stateHandlerForTests{ + State: NotReadyState, + }, + &ProviderEventing{}, + } + + executor := newEventExecutor(logger) + + err := executor.registerDefaultProvider(notReadyEventingProvider) + if err != nil { + t.Fatal(err) + } + + rsp := make(chan EventDetails, 1) + callback := func(e EventDetails) { + rsp <- e + } + + executor.registerClientHandler("someClient", ProviderReady, &callback) + + select { + case <-rsp: + t.Errorf("event must not emit for non ready provider") + case <-time.After(200 * time.Millisecond): + break + } + }) + + t.Run("no event if subscribed for some other event", func(t *testing.T) { + readyEventingProvider := struct { + FeatureProvider + StateHandler + EventHandler + }{ + NoopProvider{}, + &stateHandlerForTests{ + State: ReadyState, + }, + &ProviderEventing{}, + } + + executor := newEventExecutor(logger) + + err := executor.registerDefaultProvider(readyEventingProvider) + if err != nil { + t.Fatal(err) + } + + rsp := make(chan EventDetails, 1) + callback := func(e EventDetails) { + rsp <- e + } + + executor.registerClientHandler("someClient", ProviderError, &callback) + + select { + case <-rsp: + t.Errorf("event must not emit for this handler") + case <-time.After(200 * time.Millisecond): + break + } + }) +} + +// non-spec bound validations + +func TestEventHandler_multiSubs(t *testing.T) { + eventingImpl := &ProviderEventing{ + c: make(chan Event, 1), + } + + eventingProvider := struct { FeatureProvider - StateHandler EventHandler }{ NoopProvider{}, - &stateHandlerForTests{ - State: ReadyState, - }, - &ProviderEventing{}, + eventingImpl, } - executor := newEventExecutor(logger) + eventingImplOther := &ProviderEventing{ + c: make(chan Event, 1), + } - clientAssociation := "clientA" - err := executor.registerNamedEventingProvider(clientAssociation, readyEventingProvider) + eventingProvideOther := struct { + FeatureProvider + EventHandler + }{ + NoopProvider{}, + eventingImplOther, + } + + // register for default and named providers + err := SetProvider(eventingProvider) if err != nil { t.Fatal(err) } - rsp := make(chan EventDetails, 1) - successAPICallback := func(e EventDetails) { - rsp <- e + err = SetNamedProvider("clientA", eventingProvideOther) + if err != nil { + t.Fatal(err) + } + + err = SetNamedProvider("clientB", eventingProvideOther) + if err != nil { + t.Fatal(err) } - executor.registerClientHandler(clientAssociation, ProviderReady, &successAPICallback) + // register global and scoped callbacks + rspGlobal := make(chan EventDetails, 1) + globalF := func(e EventDetails) { + rspGlobal <- e + } + AddHandler(ProviderStale, &globalF) + + rspClientA := make(chan EventDetails, 1) + clientA := func(e EventDetails) { + rspClientA <- e + } + addClientHandler("clientA", ProviderStale, &clientA) + + rspClientB := make(chan EventDetails, 1) + clientB := func(e EventDetails) { + rspClientB <- e + } + addClientHandler("clientB", ProviderStale, &clientB) + + emitDone := make(chan interface{}) + eventCount := 5 + + // invoke events + go func() { + for i := 0; i < eventCount; i++ { + eventingImpl.Invoke(Event{ + ProviderName: "provider", + EventType: ProviderStale, + ProviderEventDetails: ProviderEventDetails{}, + }) + + eventingImplOther.Invoke(Event{ + ProviderName: "providerOther", + EventType: ProviderStale, + ProviderEventDetails: ProviderEventDetails{}, + }) + + time.Sleep(100 * time.Millisecond) + } + + emitDone <- "" + }() + + // make sure events are received and count them + globalEvents := make(chan string, 10) + go func() { + for rsp := range rspGlobal { + globalEvents <- rsp.providerName + } + }() + + clientAEvents := make(chan string, 10) + go func() { + for rsp := range rspClientA { + if rsp.providerName != "providerOther" { + t.Errorf("incorrect event provider association, expected %s, got %s", "providerOther", rsp.providerName) + } + + clientAEvents <- rsp.providerName + } + }() + + clientBEvents := make(chan string, 10) + go func() { + for rsp := range rspClientB { + if rsp.providerName != "providerOther" { + t.Errorf("incorrect event provider association, expected %s, got %s", "providerOther", rsp.providerName) + } + + clientBEvents <- rsp.providerName + } + }() select { - case <-rsp: + case <-time.After(1 * time.Minute): + t.Errorf("test timedout") + case <-emitDone: break - case <-time.After(200 * time.Millisecond): - t.Errorf("timedout waiting for ready state callback, but got none") } + + if len(globalEvents) != eventCount*2 || len(clientAEvents) != eventCount || len(clientBEvents) != eventCount { + t.Error("event counts does not match with emitted event count") + } +} + +func TestEventHandler_1ToNMapping(t *testing.T) { + t.Run("provider eventing must be subscribed only once", func(t *testing.T) { + eventingProvider := struct { + FeatureProvider + StateHandler + EventHandler + }{ + NoopProvider{}, + &stateHandlerForTests{ + State: NotReadyState, + }, + &ProviderEventing{}, + } + + executor := newEventExecutor(logger) + + err := executor.registerDefaultProvider(eventingProvider) + if err != nil { + t.Fatal(err) + } + + if len(executor.activeSubscriptions) != 1 && + executor.activeSubscriptions[0].featureProvider != eventingProvider.FeatureProvider { + t.Fatal("provider not added to active provider subscriptions") + } + + err = executor.registerNamedEventingProvider("clientA", eventingProvider) + if err != nil { + t.Fatal(err) + } + + err = executor.registerNamedEventingProvider("clientB", eventingProvider) + if err != nil { + t.Fatal(err) + } + + if len(executor.activeSubscriptions) != 1 { + t.Fatal("multiple provided in active subscriptions") + } + }) + + t.Run("avoid unsubscribe from active providers - default and named", func(t *testing.T) { + eventingProvider := struct { + FeatureProvider + StateHandler + EventHandler + }{ + NoopProvider{}, + &stateHandlerForTests{ + State: NotReadyState, + }, + &ProviderEventing{ + c: make(chan Event), + }, + } + + executor := newEventExecutor(logger) + + err := executor.registerDefaultProvider(eventingProvider) + if err != nil { + t.Fatal(err) + } + + err = executor.registerNamedEventingProvider("clientA", eventingProvider) + if err != nil { + t.Fatal(err) + } + + overridingProvider := struct { + FeatureProvider + StateHandler + EventHandler + }{ + NoopProvider{}, + &stateHandlerForTests{ + State: NotReadyState, + }, + &ProviderEventing{ + c: make(chan Event), + }, + } + + err = executor.registerNamedEventingProvider("clientA", overridingProvider) + if err != nil { + t.Fatal(err) + } + + if len(executor.activeSubscriptions) != 2 { + t.Fatal("error with active provider subscriptions") + } + }) + + t.Run("avoid unsubscribe from active providers - named only", func(t *testing.T) { + eventingProvider := struct { + FeatureProvider + StateHandler + EventHandler + }{ + NoopProvider{}, + &stateHandlerForTests{ + State: NotReadyState, + }, + &ProviderEventing{ + c: make(chan Event), + }, + } + + executor := newEventExecutor(logger) + + err := executor.registerNamedEventingProvider("clientA", eventingProvider) + if err != nil { + t.Fatal(err) + } + + err = executor.registerNamedEventingProvider("clientB", eventingProvider) + if err != nil { + t.Fatal(err) + } + + overridingProvider := struct { + FeatureProvider + StateHandler + EventHandler + }{ + NoopProvider{}, + &stateHandlerForTests{ + State: NotReadyState, + }, + &ProviderEventing{ + c: make(chan Event), + }, + } + + err = executor.registerNamedEventingProvider("clientA", overridingProvider) + if err != nil { + t.Fatal(err) + } + + if len(executor.activeSubscriptions) != 2 { + t.Fatal("error with active provider subscriptions") + } + }) + + t.Run("unbound providers must be removed from active subscriptions", func(t *testing.T) { + eventingProvider := struct { + FeatureProvider + StateHandler + EventHandler + }{ + NoopProvider{}, + &stateHandlerForTests{ + State: NotReadyState, + }, + &ProviderEventing{ + c: make(chan Event), + }, + } + + executor := newEventExecutor(logger) + + err := executor.registerNamedEventingProvider("clientA", eventingProvider) + if err != nil { + t.Fatal(err) + } + + overridingProvider := struct { + FeatureProvider + StateHandler + EventHandler + }{ + NoopProvider{}, + &stateHandlerForTests{ + State: NotReadyState, + }, + &ProviderEventing{ + c: make(chan Event), + }, + } + + err = executor.registerNamedEventingProvider("clientA", overridingProvider) + if err != nil { + t.Fatal(err) + } + + if len(executor.activeSubscriptions) != 1 && + executor.activeSubscriptions[0].featureProvider != overridingProvider.FeatureProvider { + t.Fatal("error with active provider subscriptions") + } + }) } // Contract tests - registration & removal diff --git a/pkg/openfeature/openfeature.go b/pkg/openfeature/openfeature.go index 1b875f7b..77d41d7a 100644 --- a/pkg/openfeature/openfeature.go +++ b/pkg/openfeature/openfeature.go @@ -51,22 +51,22 @@ func AddHooks(hooks ...Hook) { // AddHandler allows to add API level event handler func AddHandler(eventType EventType, callback EventCallback) { - api.registerApiHandler(eventType, callback) + api.eventExecutor.registerApiHandler(eventType, callback) } // addClientHandler is a helper for Client to add an event handler func addClientHandler(name string, t EventType, c EventCallback) { - api.registerClientHandler(name, t, c) + api.eventExecutor.registerClientHandler(name, t, c) } // RemoveHandler allows to remove API level event handler func RemoveHandler(eventType EventType, callback EventCallback) { - api.removeApiHandler(eventType, callback) + api.eventExecutor.removeApiHandler(eventType, callback) } // removeClientHandler is a helper for Client to add an event handler func removeClientHandler(name string, t EventType, c EventCallback) { - api.removeClientHandler(name, t, c) + api.eventExecutor.removeClientHandler(name, t, c) } // getAPIEventRegistry is a helper for testing diff --git a/pkg/openfeature/openfeature_test.go b/pkg/openfeature/openfeature_test.go index f2f7337d..d5171935 100644 --- a/pkg/openfeature/openfeature_test.go +++ b/pkg/openfeature/openfeature_test.go @@ -175,10 +175,81 @@ func TestRequirement_1_1_2_3(t *testing.T) { break } }) -} -// The provider mutator function MUST invoke the shutdown function on the previously registered provider once it's no -// longer being used to resolve flag values. + t.Run("ignore shutdown for multiple references - default bound", func(t *testing.T) { + defer t.Cleanup(initSingleton) + + // setup + provider, _, shutdownSem := setupProviderWithSemaphores() + + // register provider multiple times + err := SetProvider(provider) + if err != nil { + t.Errorf("error setting up provider %v", err) + } + + clientName := "clientA" + + err = SetNamedProvider(clientName, provider) + if err != nil { + t.Errorf("error setting up provider %v", err) + } + + providerOverride, _, _ := setupProviderWithSemaphores() + + err = SetNamedProvider(clientName, providerOverride) + if err != nil { + t.Errorf("error setting up provider %v", err) + } + + // validate + select { + // short enough wait time, but not too long + case <-time.After(100 * time.Millisecond): + break + case <-shutdownSem: + t.Errorf("shutdown called on the provider with multiple references") + } + }) + + t.Run("ignore shutdown for multiple references - name client bound", func(t *testing.T) { + defer t.Cleanup(initSingleton) + + // setup + providerA, _, shutdownSemA := setupProviderWithSemaphores() + + // register provider multiple times + + clientA := "clientA" + clientB := "clientB" + + err := SetNamedProvider(clientA, providerA) + if err != nil { + t.Errorf("error setting up provider %v", err) + } + + err = SetNamedProvider(clientB, providerA) + if err != nil { + t.Errorf("error setting up provider %v", err) + } + + providerOverride, _, _ := setupProviderWithSemaphores() + + err = SetNamedProvider(clientA, providerOverride) + if err != nil { + t.Errorf("error setting up provider %v", err) + } + + // validate + select { + // short enough wait time, but not too long + case <-time.After(100 * time.Millisecond): + break + case <-shutdownSemA: + t.Errorf("shutdown called on the provider with multiple references") + } + }) +} // The `API` MUST provide a function to bind a given `provider` to one or more client `name`s. // If the client-name already has a bound provider, it is overwritten with the new mapping. @@ -506,29 +577,37 @@ func use(vals ...interface{}) { func setupProviderWithSemaphores() (struct { FeatureProvider StateHandler + EventHandler }, chan interface{}, chan interface{}) { intiSem := make(chan interface{}, 1) shutdownSem := make(chan interface{}, 1) sh := &stateHandlerForTests{ // Semaphore must be invoked - initF: func(e EvaluationContext) { + initF: func(e EvaluationContext) error { intiSem <- "" + return nil }, // Semaphore must be invoked shutdownF: func() { shutdownSem <- "" }, - State: ReadyState, + State: NotReadyState, + } + + eventing := &ProviderEventing{ + c: make(chan Event, 1), } // Derive provider with initialize & shutdown support provider := struct { FeatureProvider StateHandler + EventHandler }{ FeatureProvider: NoopProvider{}, StateHandler: sh, + EventHandler: eventing, } return provider, intiSem, shutdownSem diff --git a/pkg/openfeature/provider.go b/pkg/openfeature/provider.go index b51543af..f97ccbd1 100644 --- a/pkg/openfeature/provider.go +++ b/pkg/openfeature/provider.go @@ -60,7 +60,7 @@ type State string // StateHandler is the contract for initialization & shutdown. // FeatureProvider can opt in for this behavior by implementing the interface type StateHandler interface { - Init(evaluationContext EvaluationContext) + Init(evaluationContext EvaluationContext) error Shutdown() Status() State } @@ -70,8 +70,9 @@ type StateHandler interface { type NoopStateHandler struct { } -func (s *NoopStateHandler) Init(e EvaluationContext) { +func (s *NoopStateHandler) Init(e EvaluationContext) error { // NOOP + return nil } func (s *NoopStateHandler) Shutdown() { diff --git a/pkg/openfeature/provider_mock_test.go b/pkg/openfeature/provider_mock_test.go index 3fb4c3d7..d11ee89c 100644 --- a/pkg/openfeature/provider_mock_test.go +++ b/pkg/openfeature/provider_mock_test.go @@ -156,9 +156,11 @@ func (m *MockStateHandler) EXPECT() *MockStateHandlerMockRecorder { } // Init mocks base method. -func (m *MockStateHandler) Init(evaluationContext EvaluationContext) { +func (m *MockStateHandler) Init(evaluationContext EvaluationContext) error { m.ctrl.T.Helper() - m.ctrl.Call(m, "Init", evaluationContext) + ret := m.ctrl.Call(m, "Init", evaluationContext) + ret0, _ := ret[0].(error) + return ret0 } // Init indicates an expected call of Init. diff --git a/pkg/openfeature/util_test.go b/pkg/openfeature/util_test.go index 08f48669..c4ac6f5a 100644 --- a/pkg/openfeature/util_test.go +++ b/pkg/openfeature/util_test.go @@ -28,17 +28,22 @@ func init() { // stateHandlerForTests is a StateHandler with callbacks type stateHandlerForTests struct { - initF func(e EvaluationContext) + initF func(e EvaluationContext) error shutdownF func() State } -func (s *stateHandlerForTests) Init(e EvaluationContext) { - s.initF(e) +func (s *stateHandlerForTests) Init(e EvaluationContext) error { + if s.initF != nil { + return s.initF(e) + } + return nil } func (s *stateHandlerForTests) Shutdown() { - s.shutdownF() + if s.shutdownF != nil { + s.shutdownF() + } } func (s *stateHandlerForTests) Status() State {