From 7f2358ce207527c890f4a2f46ce4b9e8bf697095 Mon Sep 17 00:00:00 2001 From: Giovanni Liva Date: Mon, 27 Feb 2023 19:54:20 +0100 Subject: [PATCH] feat: add Health and Readiness probes (#418) Signed-off-by: Giovanni Liva Signed-off-by: Giovanni Liva Signed-off-by: Todd Baert Co-authored-by: Todd Baert --- config/samples/example_flags.flagd.json | 6 +- pkg/runtime/from_config.go | 103 +++++++++++++++--------- pkg/runtime/runtime.go | 23 +++++- pkg/service/connect_service.go | 21 +++-- pkg/service/connect_service_test.go | 9 ++- pkg/service/iservice.go | 8 +- pkg/sync/file/filepath_sync.go | 43 +++++++--- pkg/sync/file/filepath_sync_test.go | 57 +++++++++++-- pkg/sync/grpc/grpc_sync.go | 37 +++++++-- pkg/sync/grpc/grpc_sync_test.go | 26 ++++-- pkg/sync/http/http_sync.go | 14 ++++ pkg/sync/http/http_sync_test.go | 8 ++ pkg/sync/isync.go | 6 ++ pkg/sync/kubernetes/kubernetes_sync.go | 13 ++- 14 files changed, 291 insertions(+), 83 deletions(-) diff --git a/config/samples/example_flags.flagd.json b/config/samples/example_flags.flagd.json index 785f793a4..0314b5eda 100644 --- a/config/samples/example_flags.flagd.json +++ b/config/samples/example_flags.flagd.json @@ -126,9 +126,9 @@ }, "$evaluators": { "emailWithFaas": { - "in": ["@faas.com", { - "var": ["email"] - }] + "in": ["@faas.com", { + "var": ["email"] + }] } } } diff --git a/pkg/runtime/from_config.go b/pkg/runtime/from_config.go index 4ff8c4e12..7e313de9c 100644 --- a/pkg/runtime/from_config.go +++ b/pkg/runtime/from_config.go @@ -4,6 +4,7 @@ import ( "fmt" "net/http" "regexp" + msync "sync" "time" "github.com/open-feature/flagd/pkg/eval" @@ -67,48 +68,28 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error { for _, uri := range r.config.SyncURI { switch uriB := []byte(uri); { case regFile.Match(uriB): - r.SyncImpl = append(r.SyncImpl, &file.Sync{ - URI: regFile.ReplaceAllString(uri, ""), - Logger: logger.WithFields( - zap.String("component", "sync"), - zap.String("sync", "filepath"), - ), - ProviderArgs: r.config.ProviderArgs, - }) + r.SyncImpl = append( + r.SyncImpl, + r.newFile(uri, logger), + ) rtLogger.Debug(fmt.Sprintf("using filepath sync-provider for: %q", uri)) case regCrd.Match(uriB): - r.SyncImpl = append(r.SyncImpl, &kubernetes.Sync{ - Logger: logger.WithFields( - zap.String("component", "sync"), - zap.String("sync", "kubernetes"), - ), - URI: regCrd.ReplaceAllString(uri, ""), - ProviderArgs: r.config.ProviderArgs, - }) + r.SyncImpl = append( + r.SyncImpl, + r.newK8s(uri, logger), + ) rtLogger.Debug(fmt.Sprintf("using kubernetes sync-provider for: %s", uri)) case regURL.Match(uriB): - r.SyncImpl = append(r.SyncImpl, &httpSync.Sync{ - URI: uri, - BearerToken: r.config.SyncBearerToken, - Client: &http.Client{ - Timeout: time.Second * 10, - }, - Logger: logger.WithFields( - zap.String("component", "sync"), - zap.String("sync", "remote"), - ), - ProviderArgs: r.config.ProviderArgs, - Cron: cron.New(), - }) + r.SyncImpl = append( + r.SyncImpl, + r.newHTTP(uri, logger), + ) rtLogger.Debug(fmt.Sprintf("using remote sync-provider for: %q", uri)) case regGRPC.Match(uriB): - r.SyncImpl = append(r.SyncImpl, &grpc.Sync{ - Target: grpc.URLToGRPCTarget(uri), - Logger: logger.WithFields( - zap.String("component", "sync"), - zap.String("sync", "grpc"), - ), - }) + r.SyncImpl = append( + r.SyncImpl, + r.newGRPC(uri, logger), + ) default: return fmt.Errorf("invalid sync uri argument: %s, must start with 'file:', 'http(s)://', 'grpc://',"+ " or 'core.openfeature.dev'", uri) @@ -116,3 +97,53 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error { } return nil } + +func (r *Runtime) newGRPC(uri string, logger *logger.Logger) *grpc.Sync { + return &grpc.Sync{ + Target: grpc.URLToGRPCTarget(uri), + Logger: logger.WithFields( + zap.String("component", "sync"), + zap.String("sync", "grpc"), + ), + Mux: &msync.RWMutex{}, + } +} + +func (r *Runtime) newHTTP(uri string, logger *logger.Logger) *httpSync.Sync { + return &httpSync.Sync{ + URI: uri, + BearerToken: r.config.SyncBearerToken, + Client: &http.Client{ + Timeout: time.Second * 10, + }, + Logger: logger.WithFields( + zap.String("component", "sync"), + zap.String("sync", "remote"), + ), + ProviderArgs: r.config.ProviderArgs, + Cron: cron.New(), + } +} + +func (r *Runtime) newK8s(uri string, logger *logger.Logger) *kubernetes.Sync { + return &kubernetes.Sync{ + Logger: logger.WithFields( + zap.String("component", "sync"), + zap.String("sync", "kubernetes"), + ), + URI: regCrd.ReplaceAllString(uri, ""), + ProviderArgs: r.config.ProviderArgs, + } +} + +func (r *Runtime) newFile(uri string, logger *logger.Logger) *file.Sync { + return &file.Sync{ + URI: regFile.ReplaceAllString(uri, ""), + Logger: logger.WithFields( + zap.String("component", "sync"), + zap.String("sync", "filepath"), + ), + ProviderArgs: r.config.ProviderArgs, + Mux: &msync.RWMutex{}, + } +} diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 0b9ffefbb..4772eaef3 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -70,7 +70,14 @@ func (r *Runtime) Start() error { } }) - // Start sync providers + // Init sync providers + for _, s := range r.SyncImpl { + if err := s.Init(gCtx); err != nil { + return err + } + } + + // Start sync provider for _, s := range r.SyncImpl { p := s g.Go(func() error { @@ -79,7 +86,9 @@ func (r *Runtime) Start() error { } g.Go(func() error { - return r.Service.Serve(gCtx, r.Evaluator) + return r.Service.Serve(gCtx, r.Evaluator, service.Configuration{ + ReadinessProbe: r.isReady, + }) }) <-gCtx.Done() @@ -89,6 +98,16 @@ func (r *Runtime) Start() error { return nil } +func (r *Runtime) isReady() bool { + // if all providers can watch for flag changes, we are ready. + for _, p := range r.SyncImpl { + if !p.IsReady() { + return false + } + } + return true +} + // updateWithNotify helps to update state and notify listeners func (r *Runtime) updateWithNotify(payload sync.DataSync) { r.mu.Lock() diff --git a/pkg/service/connect_service.go b/pkg/service/connect_service.go index e413f2052..fa92a7e85 100644 --- a/pkg/service/connect_service.go +++ b/pkg/service/connect_service.go @@ -48,13 +48,13 @@ type eventingConfiguration struct { subs map[interface{}]chan Notification } -func (s *ConnectService) Serve(ctx context.Context, eval eval.IEvaluator) error { +func (s *ConnectService) Serve(ctx context.Context, eval eval.IEvaluator, svcConf Configuration) error { s.Eval = eval s.eventingConfiguration = &eventingConfiguration{ subs: make(map[interface{}]chan Notification), mu: &sync.RWMutex{}, } - lis, err := s.setupServer() + lis, err := s.setupServer(svcConf) if err != nil { return err } @@ -87,7 +87,7 @@ func (s *ConnectService) Serve(ctx context.Context, eval eval.IEvaluator) error } } -func (s *ConnectService) setupServer() (net.Listener, error) { +func (s *ConnectService) setupServer(svcConf Configuration) (net.Listener, error) { var lis net.Listener var err error mux := http.NewServeMux() @@ -107,15 +107,24 @@ func (s *ConnectService) setupServer() (net.Listener, error) { }) h := Handler("", mdlw, mux) go func() { - s.Logger.Info(fmt.Sprintf("metrics listening at %d", s.ConnectServiceConfiguration.MetricsPort)) + s.Logger.Info(fmt.Sprintf("metrics and probes listening at %d", s.ConnectServiceConfiguration.MetricsPort)) server := &http.Server{ Addr: fmt.Sprintf(":%d", s.ConnectServiceConfiguration.MetricsPort), ReadHeaderTimeout: 3 * time.Second, } server.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path == "/metrics" { + switch r.URL.Path { + case "/healthz": + w.WriteHeader(http.StatusOK) + case "/readyz": + if svcConf.ReadinessProbe() { + w.WriteHeader(http.StatusOK) + } else { + w.WriteHeader(http.StatusPreconditionFailed) + } + case "/metrics": promhttp.Handler().ServeHTTP(w, r) - } else { + default: w.WriteHeader(http.StatusNotFound) } }) diff --git a/pkg/service/connect_service_test.go b/pkg/service/connect_service_test.go index 10dc3eeeb..4616f3319 100644 --- a/pkg/service/connect_service_test.go +++ b/pkg/service/connect_service_test.go @@ -69,18 +69,23 @@ func TestConnectService_UnixConnection(t *testing.T) { tt.evalFields.reason, tt.evalFields.err, ).AnyTimes() - service := service.ConnectService{ + svc := service.ConnectService{ ConnectServiceConfiguration: &service.ConnectServiceConfiguration{ ServerSocketPath: tt.socketPath, }, Logger: logger.NewLogger(nil, false), } + serveConf := service.Configuration{ + ReadinessProbe: func() bool { + return true + }, + } ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() go func() { - err := service.Serve(ctx, eval) + err := svc.Serve(ctx, eval, serveConf) fmt.Println(err) }() conn, err := grpc.Dial( diff --git a/pkg/service/iservice.go b/pkg/service/iservice.go index f289fa113..1f1603d2f 100644 --- a/pkg/service/iservice.go +++ b/pkg/service/iservice.go @@ -19,12 +19,16 @@ type Notification struct { Data map[string]interface{} `json:"data"` } -type IServiceConfiguration interface{} +type ReadinessProbe func() bool + +type Configuration struct { + ReadinessProbe ReadinessProbe +} /* IService implementations define handlers for a particular transport, which call the IEvaluator implementation. */ type IService interface { - Serve(ctx context.Context, eval eval.IEvaluator) error + Serve(ctx context.Context, eval eval.IEvaluator, svcConf Configuration) error Notify(n Notification) } diff --git a/pkg/sync/file/filepath_sync.go b/pkg/sync/file/filepath_sync.go index b7227991a..5fc31ebfd 100644 --- a/pkg/sync/file/filepath_sync.go +++ b/pkg/sync/file/filepath_sync.go @@ -7,13 +7,13 @@ import ( "fmt" "os" "strings" - - "github.com/open-feature/flagd/pkg/sync" + msync "sync" "gopkg.in/yaml.v3" "github.com/fsnotify/fsnotify" "github.com/open-feature/flagd/pkg/logger" + "github.com/open-feature/flagd/pkg/sync" ) type Sync struct { @@ -22,31 +22,49 @@ type Sync struct { ProviderArgs sync.ProviderArgs // FileType indicates the file type e.g., json, yaml/yml etc., fileType string + watcher *fsnotify.Watcher + ready bool + Mux *msync.RWMutex } // default state is used to prevent EOF errors when handling filepath delete events + empty files const defaultState = "{}" -//nolint:funlen -func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { +func (fs *Sync) Init(ctx context.Context) error { fs.Logger.Info("Starting filepath sync notifier") - watcher, err := fsnotify.NewWatcher() + w, err := fsnotify.NewWatcher() if err != nil { return err } - defer watcher.Close() - - err = watcher.Add(fs.URI) + fs.watcher = w + err = fs.watcher.Add(fs.URI) if err != nil { return err } + return nil +} - fs.sendDataSync(ctx, sync.ALL, dataSync) +func (fs *Sync) IsReady() bool { + fs.Mux.RLock() + defer fs.Mux.RUnlock() + return fs.ready +} + +func (fs *Sync) setReady(val bool) { + fs.Mux.Lock() + defer fs.Mux.Unlock() + fs.ready = val +} +//nolint:funlen +func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { + defer fs.watcher.Close() + fs.sendDataSync(ctx, sync.ALL, dataSync) + fs.setReady(true) fs.Logger.Info(fmt.Sprintf("watching filepath: %s", fs.URI)) for { select { - case event, ok := <-watcher.Events: + case event, ok := <-fs.watcher.Events: if !ok { fs.Logger.Info("filepath notifier closed") return errors.New("filepath notifier closed") @@ -59,7 +77,7 @@ func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { case event.Has(fsnotify.Remove): // K8s exposes config maps as symlinks. // Updates cause a remove event, we need to re-add the watcher in this case. - err = watcher.Add(fs.URI) + err := fs.watcher.Add(fs.URI) if err != nil { // the watcher could not be re-added, so the file must have been deleted fs.Logger.Error(fmt.Sprintf("error restoring watcher, file may have been deleted: %s", err.Error())) @@ -80,8 +98,9 @@ func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { } } - case err, ok := <-watcher.Errors: + case err, ok := <-fs.watcher.Errors: if !ok { + fs.setReady(false) return errors.New("watcher error") } diff --git a/pkg/sync/file/filepath_sync_test.go b/pkg/sync/file/filepath_sync_test.go index 1d1de2f5f..330973d56 100644 --- a/pkg/sync/file/filepath_sync_test.go +++ b/pkg/sync/file/filepath_sync_test.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "os" + msync "sync" "testing" "time" @@ -84,11 +85,6 @@ func TestSimpleSync(t *testing.T) { }, } - handler := Sync{ - URI: fmt.Sprintf("%s/%s", fetchDirName, fetchFileName), - Logger: logger.NewLogger(nil, false), - } - for test, tt := range tests { t.Run(test, func(t *testing.T) { defer t.Cleanup(cleanupFilePath) @@ -96,10 +92,21 @@ func TestSimpleSync(t *testing.T) { createFile(t, fetchDirName, fetchFileName) ctx := context.Background() + dataSyncChan := make(chan sync.DataSync, len(tt.expectedDataSync)) go func() { - err := handler.Sync(ctx, dataSyncChan) + handler := Sync{ + URI: fmt.Sprintf("%s/%s", fetchDirName, fetchFileName), + Logger: logger.NewLogger(nil, false), + Mux: &msync.RWMutex{}, + } + err := handler.Init(ctx) + if err != nil { + log.Fatalf("Error init sync: %s", err.Error()) + return + } + err = handler.Sync(ctx, dataSyncChan) if err != nil { log.Fatalf("Error start sync: %s", err.Error()) return @@ -181,6 +188,44 @@ func TestFilePathSync_Fetch(t *testing.T) { } } +func TestIsReadySyncFlag(t *testing.T) { + fpSync := Sync{ + URI: fmt.Sprintf("%s/%s", fetchDirName, fetchFileName), + Logger: logger.NewLogger(nil, false), + Mux: &msync.RWMutex{}, + } + + setupDir(t, fetchDirName) + createFile(t, fetchDirName, fetchFileName) + writeToFile(t, fetchFileContents) + defer t.Cleanup(cleanupFilePath) + if fpSync.IsReady() != false { + t.Errorf("expected not to be ready") + } + ctx := context.TODO() + err := fpSync.Init(ctx) + if err != nil { + log.Printf("Error init sync: %s", err.Error()) + return + } + if fpSync.IsReady() != false { + t.Errorf("expected not to be ready") + } + dataSyncChan := make(chan sync.DataSync, 1) + + go func() { + err = fpSync.Sync(ctx, dataSyncChan) + if err != nil { + log.Fatalf("Error start sync: %s", err.Error()) + return + } + }() + time.Sleep(1 * time.Second) + if fpSync.IsReady() != true { + t.Errorf("expected to be ready") + } +} + func cleanupFilePath() { if err := os.RemoveAll(fetchDirName); err != nil { log.Fatalf("rmdir: %v", err) diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go index 20d96f485..a36c7ac35 100644 --- a/pkg/sync/grpc/grpc_sync.go +++ b/pkg/sync/grpc/grpc_sync.go @@ -5,6 +5,7 @@ import ( "fmt" "math" "strings" + msync "sync" "time" "google.golang.org/grpc/credentials/insecure" @@ -34,15 +35,19 @@ type Sync struct { Target string ProviderID string Logger *logger.Logger + client syncv1grpc.FlagSyncService_SyncFlagsClient + options []grpc.DialOption + ready bool + Mux *msync.RWMutex } -func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { - options := []grpc.DialOption{ +func (g *Sync) Init(ctx context.Context) error { + g.options = []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), } // initial dial and connection. Failure here must result in a startup failure - dial, err := grpc.DialContext(ctx, g.Target, options...) + dial, err := grpc.DialContext(ctx, g.Target, g.options...) if err != nil { g.Logger.Error(fmt.Sprintf("error establishing grpc connection: %s", err.Error())) return err @@ -54,21 +59,39 @@ func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { g.Logger.Error(fmt.Sprintf("error calling streaming operation: %s", err.Error())) return err } + g.client = syncClient + return nil +} +func (g *Sync) IsReady() bool { + g.Mux.RLock() + defer g.Mux.RUnlock() + return g.ready +} + +func (g *Sync) setReady(val bool) { + g.Mux.Lock() + defer g.Mux.Unlock() + g.ready = val +} + +func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { // initial stream listening - err = g.handleFlagSync(syncClient, dataSync) + g.setReady(true) + err := g.handleFlagSync(g.client, dataSync) g.Logger.Warn(fmt.Sprintf("error with stream listener: %s", err.Error())) - // retry connection establishment for { - syncClient, ok := g.connectWithRetry(ctx, options...) + g.setReady(false) + syncClient, ok := g.connectWithRetry(ctx, g.options...) if !ok { // We shall exit return nil } - + g.setReady(true) err = g.handleFlagSync(syncClient, dataSync) if err != nil { + g.setReady(false) g.Logger.Warn(fmt.Sprintf("error with stream listener: %s", err.Error())) continue } diff --git a/pkg/sync/grpc/grpc_sync_test.go b/pkg/sync/grpc/grpc_sync_test.go index d521372c8..b326a2340 100644 --- a/pkg/sync/grpc/grpc_sync_test.go +++ b/pkg/sync/grpc/grpc_sync_test.go @@ -6,6 +6,7 @@ import ( "io" "log" "net" + msync "sync" "testing" "buf.build/gen/go/open-feature/flagd/grpc/go/sync/v1/syncv1grpc" @@ -54,12 +55,14 @@ func TestSync_BasicFlagSyncStates(t *testing.T) { Target: "grpc://test", ProviderID: "", Logger: logger.NewLogger(nil, false), + Mux: &msync.RWMutex{}, } tests := []struct { name string stream syncv1grpc.FlagSyncService_SyncFlagsClient want sync.Type + ready bool }{ { name: "State All maps to Sync All", @@ -69,7 +72,8 @@ func TestSync_BasicFlagSyncStates(t *testing.T) { State: v1.SyncState_SYNC_STATE_ALL, }, }, - want: sync.ALL, + want: sync.ALL, + ready: true, }, { name: "State Add maps to Sync Add", @@ -79,7 +83,8 @@ func TestSync_BasicFlagSyncStates(t *testing.T) { State: v1.SyncState_SYNC_STATE_ADD, }, }, - want: sync.ADD, + want: sync.ADD, + ready: true, }, { name: "State Update maps to Sync Update", @@ -89,7 +94,8 @@ func TestSync_BasicFlagSyncStates(t *testing.T) { State: v1.SyncState_SYNC_STATE_UPDATE, }, }, - want: sync.UPDATE, + want: sync.UPDATE, + ready: true, }, { name: "State Delete maps to Sync Delete", @@ -99,7 +105,8 @@ func TestSync_BasicFlagSyncStates(t *testing.T) { State: v1.SyncState_SYNC_STATE_DELETE, }, }, - want: sync.DELETE, + want: sync.DELETE, + ready: true, }, } @@ -108,13 +115,18 @@ func TestSync_BasicFlagSyncStates(t *testing.T) { syncChan := make(chan sync.DataSync) go func() { - err := grpcSyncImpl.handleFlagSync(test.stream, syncChan) + grpcSyncImpl.client = test.stream + err := grpcSyncImpl.Sync(context.TODO(), syncChan) if err != nil { t.Errorf("Error handling flag sync: %s", err.Error()) } }() data := <-syncChan + if grpcSyncImpl.IsReady() != test.ready { + t.Errorf("expected grpcSyncImpl.ready to be: '%v', got: '%v'", test.ready, grpcSyncImpl.ready) + } + if data.Type != test.want { t.Errorf("Returned data sync state = %v, wanted %v", data.Type, test.want) } @@ -231,6 +243,7 @@ func Test_StreamListener(t *testing.T) { Target: target, ProviderID: "", Logger: logger.NewLogger(nil, false), + Mux: &msync.RWMutex{}, } // initialize client @@ -253,7 +266,8 @@ func Test_StreamListener(t *testing.T) { // listen to stream go func() { - err := grpcSync.handleFlagSync(syncClient, syncChan) + grpcSync.client = syncClient + err := grpcSync.Sync(context.TODO(), syncChan) if err != nil { // must ignore EOF as this is returned for stream end if err != io.EOF { diff --git a/pkg/sync/http/http_sync.go b/pkg/sync/http/http_sync.go index 3ce78fb80..c2a56460f 100644 --- a/pkg/sync/http/http_sync.go +++ b/pkg/sync/http/http_sync.go @@ -23,6 +23,7 @@ type Sync struct { LastBodySHA string Logger *logger.Logger ProviderArgs sync.ProviderArgs + ready bool } // Client defines the behaviour required of a http client @@ -37,12 +38,22 @@ type Cron interface { Stop() } +func (hs *Sync) Init(ctx context.Context) error { + // noop + return nil +} + +func (hs *Sync) IsReady() bool { + return hs.ready +} + func (hs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { // Initial fetch fetch, err := hs.Fetch(ctx) if err != nil { return err } + hs.ready = true _ = hs.Cron.AddFunc("*/5 * * * *", func() { body, err := hs.fetchBodyFromURL(ctx, hs.URI) @@ -130,15 +141,18 @@ func (hs *Sync) generateSha(body []byte) string { func (hs *Sync) Fetch(ctx context.Context) (string, error) { if hs.URI == "" { + hs.ready = false return "", errors.New("no HTTP URL string set") } body, err := hs.fetchBodyFromURL(ctx, hs.URI) if err != nil { + hs.ready = false return "", err } if len(body) != 0 { hs.LastBodySHA = hs.generateSha(body) } + hs.ready = true return string(body), nil } diff --git a/pkg/sync/http/http_sync_test.go b/pkg/sync/http/http_sync_test.go index 9689d74c1..e7e1596c4 100644 --- a/pkg/sync/http/http_sync_test.go +++ b/pkg/sync/http/http_sync_test.go @@ -64,6 +64,7 @@ func TestHTTPSync_Fetch(t *testing.T) { bearerToken string lastBodySHA string handleResponse func(*testing.T, Sync, string, error) + ready bool }{ "success": { setup: func(t *testing.T, client *syncmock.MockClient) { @@ -81,6 +82,7 @@ func TestHTTPSync_Fetch(t *testing.T) { t.Errorf("expected fetched to be: '%s', got: '%s'", expected, fetched) } }, + ready: true, }, "return an error if no uri": { setup: func(t *testing.T, client *syncmock.MockClient) {}, @@ -89,6 +91,7 @@ func TestHTTPSync_Fetch(t *testing.T) { t.Error("expected err, got nil") } }, + ready: false, }, "update last body sha": { setup: func(t *testing.T, client *syncmock.MockClient) { @@ -110,6 +113,7 @@ func TestHTTPSync_Fetch(t *testing.T) { ) } }, + ready: true, }, "authorization header": { setup: func(t *testing.T, client *syncmock.MockClient) { @@ -131,6 +135,7 @@ func TestHTTPSync_Fetch(t *testing.T) { ) } }, + ready: true, }, } @@ -149,6 +154,9 @@ func TestHTTPSync_Fetch(t *testing.T) { } fetched, err := httpSync.Fetch(context.Background()) + if httpSync.IsReady() != tt.ready { + t.Errorf("expected httpSync.ready to be: '%v', got: '%v'", tt.ready, httpSync.ready) + } tt.handleResponse(t, httpSync, fetched, err) }) } diff --git a/pkg/sync/isync.go b/pkg/sync/isync.go index 86ed7c2d8..7af60ff9d 100644 --- a/pkg/sync/isync.go +++ b/pkg/sync/isync.go @@ -38,9 +38,15 @@ ISync implementations watch for changes in the flag sources (HTTP backend, local value and communicate to the Runtime with DataSync channel */ type ISync interface { + // Init is used by the sync provider to initialize its data structures and external dependencies. + Init(ctx context.Context) error + // Sync is the contract between Runtime and sync implementation. // Note that, it is expected to return the first data sync as soon as possible to fill the store. Sync(ctx context.Context, dataSync chan<- DataSync) error + + // IsReady shall return true if the provider is ready to communicate with the Runtime + IsReady() bool } // DataSync is the data contract between Runtime and sync implementations diff --git a/pkg/sync/kubernetes/kubernetes_sync.go b/pkg/sync/kubernetes/kubernetes_sync.go index f7f094cb1..632599d67 100644 --- a/pkg/sync/kubernetes/kubernetes_sync.go +++ b/pkg/sync/kubernetes/kubernetes_sync.go @@ -30,6 +30,17 @@ type Sync struct { ProviderArgs sync.ProviderArgs client client.Client URI string + ready bool +} + +func (k *Sync) Init(ctx context.Context) error { + // noop + return nil +} + +func (k *Sync) IsReady() bool { + // we cannot reliably check external HTTP(s) sources + return k.ready } func (k *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { @@ -74,6 +85,7 @@ func (k *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { k.Logger.Debug("configuration deleted") case DefaultEventTypeReady: k.Logger.Debug("notifier ready") + k.ready = true } } } @@ -198,7 +210,6 @@ func (k *Sync) notify(ctx context.Context, c chan<- INotify) { EventType: DefaultEventTypeReady, }, } - informer.Run(ctx.Done()) }