Skip to content

Commit

Permalink
feat: add Health and Readiness probes (#418)
Browse files Browse the repository at this point in the history
Signed-off-by: Giovanni Liva <giovanni.liva@dynatrace.com>
Signed-off-by: Giovanni Liva <this_that@hotmail.it>
Signed-off-by: Todd Baert <toddbaert@gmail.com>
Co-authored-by: Todd Baert <toddbaert@gmail.com>
  • Loading branch information
thisthat and toddbaert authored Feb 27, 2023
1 parent 73d7840 commit 7f2358c
Show file tree
Hide file tree
Showing 14 changed files with 291 additions and 83 deletions.
6 changes: 3 additions & 3 deletions config/samples/example_flags.flagd.json
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@
},
"$evaluators": {
"emailWithFaas": {
"in": ["@faas.com", {
"var": ["email"]
}]
"in": ["@faas.com", {
"var": ["email"]
}]
}
}
}
103 changes: 67 additions & 36 deletions pkg/runtime/from_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"net/http"
"regexp"
msync "sync"
"time"

"github.com/open-feature/flagd/pkg/eval"
Expand Down Expand Up @@ -67,52 +68,82 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error {
for _, uri := range r.config.SyncURI {
switch uriB := []byte(uri); {
case regFile.Match(uriB):
r.SyncImpl = append(r.SyncImpl, &file.Sync{
URI: regFile.ReplaceAllString(uri, ""),
Logger: logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "filepath"),
),
ProviderArgs: r.config.ProviderArgs,
})
r.SyncImpl = append(
r.SyncImpl,
r.newFile(uri, logger),
)
rtLogger.Debug(fmt.Sprintf("using filepath sync-provider for: %q", uri))
case regCrd.Match(uriB):
r.SyncImpl = append(r.SyncImpl, &kubernetes.Sync{
Logger: logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "kubernetes"),
),
URI: regCrd.ReplaceAllString(uri, ""),
ProviderArgs: r.config.ProviderArgs,
})
r.SyncImpl = append(
r.SyncImpl,
r.newK8s(uri, logger),
)
rtLogger.Debug(fmt.Sprintf("using kubernetes sync-provider for: %s", uri))
case regURL.Match(uriB):
r.SyncImpl = append(r.SyncImpl, &httpSync.Sync{
URI: uri,
BearerToken: r.config.SyncBearerToken,
Client: &http.Client{
Timeout: time.Second * 10,
},
Logger: logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "remote"),
),
ProviderArgs: r.config.ProviderArgs,
Cron: cron.New(),
})
r.SyncImpl = append(
r.SyncImpl,
r.newHTTP(uri, logger),
)
rtLogger.Debug(fmt.Sprintf("using remote sync-provider for: %q", uri))
case regGRPC.Match(uriB):
r.SyncImpl = append(r.SyncImpl, &grpc.Sync{
Target: grpc.URLToGRPCTarget(uri),
Logger: logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "grpc"),
),
})
r.SyncImpl = append(
r.SyncImpl,
r.newGRPC(uri, logger),
)
default:
return fmt.Errorf("invalid sync uri argument: %s, must start with 'file:', 'http(s)://', 'grpc://',"+
" or 'core.openfeature.dev'", uri)
}
}
return nil
}

func (r *Runtime) newGRPC(uri string, logger *logger.Logger) *grpc.Sync {
return &grpc.Sync{
Target: grpc.URLToGRPCTarget(uri),
Logger: logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "grpc"),
),
Mux: &msync.RWMutex{},
}
}

func (r *Runtime) newHTTP(uri string, logger *logger.Logger) *httpSync.Sync {
return &httpSync.Sync{
URI: uri,
BearerToken: r.config.SyncBearerToken,
Client: &http.Client{
Timeout: time.Second * 10,
},
Logger: logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "remote"),
),
ProviderArgs: r.config.ProviderArgs,
Cron: cron.New(),
}
}

func (r *Runtime) newK8s(uri string, logger *logger.Logger) *kubernetes.Sync {
return &kubernetes.Sync{
Logger: logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "kubernetes"),
),
URI: regCrd.ReplaceAllString(uri, ""),
ProviderArgs: r.config.ProviderArgs,
}
}

func (r *Runtime) newFile(uri string, logger *logger.Logger) *file.Sync {
return &file.Sync{
URI: regFile.ReplaceAllString(uri, ""),
Logger: logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "filepath"),
),
ProviderArgs: r.config.ProviderArgs,
Mux: &msync.RWMutex{},
}
}
23 changes: 21 additions & 2 deletions pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,14 @@ func (r *Runtime) Start() error {
}
})

// Start sync providers
// Init sync providers
for _, s := range r.SyncImpl {
if err := s.Init(gCtx); err != nil {
return err
}
}

// Start sync provider
for _, s := range r.SyncImpl {
p := s
g.Go(func() error {
Expand All @@ -79,7 +86,9 @@ func (r *Runtime) Start() error {
}

g.Go(func() error {
return r.Service.Serve(gCtx, r.Evaluator)
return r.Service.Serve(gCtx, r.Evaluator, service.Configuration{
ReadinessProbe: r.isReady,
})
})

<-gCtx.Done()
Expand All @@ -89,6 +98,16 @@ func (r *Runtime) Start() error {
return nil
}

func (r *Runtime) isReady() bool {
// if all providers can watch for flag changes, we are ready.
for _, p := range r.SyncImpl {
if !p.IsReady() {
return false
}
}
return true
}

// updateWithNotify helps to update state and notify listeners
func (r *Runtime) updateWithNotify(payload sync.DataSync) {
r.mu.Lock()
Expand Down
21 changes: 15 additions & 6 deletions pkg/service/connect_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ type eventingConfiguration struct {
subs map[interface{}]chan Notification
}

func (s *ConnectService) Serve(ctx context.Context, eval eval.IEvaluator) error {
func (s *ConnectService) Serve(ctx context.Context, eval eval.IEvaluator, svcConf Configuration) error {
s.Eval = eval
s.eventingConfiguration = &eventingConfiguration{
subs: make(map[interface{}]chan Notification),
mu: &sync.RWMutex{},
}
lis, err := s.setupServer()
lis, err := s.setupServer(svcConf)
if err != nil {
return err
}
Expand Down Expand Up @@ -87,7 +87,7 @@ func (s *ConnectService) Serve(ctx context.Context, eval eval.IEvaluator) error
}
}

func (s *ConnectService) setupServer() (net.Listener, error) {
func (s *ConnectService) setupServer(svcConf Configuration) (net.Listener, error) {
var lis net.Listener
var err error
mux := http.NewServeMux()
Expand All @@ -107,15 +107,24 @@ func (s *ConnectService) setupServer() (net.Listener, error) {
})
h := Handler("", mdlw, mux)
go func() {
s.Logger.Info(fmt.Sprintf("metrics listening at %d", s.ConnectServiceConfiguration.MetricsPort))
s.Logger.Info(fmt.Sprintf("metrics and probes listening at %d", s.ConnectServiceConfiguration.MetricsPort))
server := &http.Server{
Addr: fmt.Sprintf(":%d", s.ConnectServiceConfiguration.MetricsPort),
ReadHeaderTimeout: 3 * time.Second,
}
server.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/metrics" {
switch r.URL.Path {
case "/healthz":
w.WriteHeader(http.StatusOK)
case "/readyz":
if svcConf.ReadinessProbe() {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusPreconditionFailed)
}
case "/metrics":
promhttp.Handler().ServeHTTP(w, r)
} else {
default:
w.WriteHeader(http.StatusNotFound)
}
})
Expand Down
9 changes: 7 additions & 2 deletions pkg/service/connect_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,23 @@ func TestConnectService_UnixConnection(t *testing.T) {
tt.evalFields.reason,
tt.evalFields.err,
).AnyTimes()
service := service.ConnectService{
svc := service.ConnectService{
ConnectServiceConfiguration: &service.ConnectServiceConfiguration{
ServerSocketPath: tt.socketPath,
},
Logger: logger.NewLogger(nil, false),
}
serveConf := service.Configuration{
ReadinessProbe: func() bool {
return true
},
}
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

go func() {
err := service.Serve(ctx, eval)
err := svc.Serve(ctx, eval, serveConf)
fmt.Println(err)
}()
conn, err := grpc.Dial(
Expand Down
8 changes: 6 additions & 2 deletions pkg/service/iservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ type Notification struct {
Data map[string]interface{} `json:"data"`
}

type IServiceConfiguration interface{}
type ReadinessProbe func() bool

type Configuration struct {
ReadinessProbe ReadinessProbe
}

/*
IService implementations define handlers for a particular transport, which call the IEvaluator implementation.
*/
type IService interface {
Serve(ctx context.Context, eval eval.IEvaluator) error
Serve(ctx context.Context, eval eval.IEvaluator, svcConf Configuration) error
Notify(n Notification)
}
43 changes: 31 additions & 12 deletions pkg/sync/file/filepath_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import (
"fmt"
"os"
"strings"

"github.com/open-feature/flagd/pkg/sync"
msync "sync"

"gopkg.in/yaml.v3"

"github.com/fsnotify/fsnotify"
"github.com/open-feature/flagd/pkg/logger"
"github.com/open-feature/flagd/pkg/sync"
)

type Sync struct {
Expand All @@ -22,31 +22,49 @@ type Sync struct {
ProviderArgs sync.ProviderArgs
// FileType indicates the file type e.g., json, yaml/yml etc.,
fileType string
watcher *fsnotify.Watcher
ready bool
Mux *msync.RWMutex
}

// default state is used to prevent EOF errors when handling filepath delete events + empty files
const defaultState = "{}"

//nolint:funlen
func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
func (fs *Sync) Init(ctx context.Context) error {
fs.Logger.Info("Starting filepath sync notifier")
watcher, err := fsnotify.NewWatcher()
w, err := fsnotify.NewWatcher()
if err != nil {
return err
}
defer watcher.Close()

err = watcher.Add(fs.URI)
fs.watcher = w
err = fs.watcher.Add(fs.URI)
if err != nil {
return err
}
return nil
}

fs.sendDataSync(ctx, sync.ALL, dataSync)
func (fs *Sync) IsReady() bool {
fs.Mux.RLock()
defer fs.Mux.RUnlock()
return fs.ready
}

func (fs *Sync) setReady(val bool) {
fs.Mux.Lock()
defer fs.Mux.Unlock()
fs.ready = val
}

//nolint:funlen
func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
defer fs.watcher.Close()
fs.sendDataSync(ctx, sync.ALL, dataSync)
fs.setReady(true)
fs.Logger.Info(fmt.Sprintf("watching filepath: %s", fs.URI))
for {
select {
case event, ok := <-watcher.Events:
case event, ok := <-fs.watcher.Events:
if !ok {
fs.Logger.Info("filepath notifier closed")
return errors.New("filepath notifier closed")
Expand All @@ -59,7 +77,7 @@ func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
case event.Has(fsnotify.Remove):
// K8s exposes config maps as symlinks.
// Updates cause a remove event, we need to re-add the watcher in this case.
err = watcher.Add(fs.URI)
err := fs.watcher.Add(fs.URI)
if err != nil {
// the watcher could not be re-added, so the file must have been deleted
fs.Logger.Error(fmt.Sprintf("error restoring watcher, file may have been deleted: %s", err.Error()))
Expand All @@ -80,8 +98,9 @@ func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
}
}

case err, ok := <-watcher.Errors:
case err, ok := <-fs.watcher.Errors:
if !ok {
fs.setReady(false)
return errors.New("watcher error")
}

Expand Down
Loading

1 comment on commit 7f2358c

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark 'Go Benchmark'.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 1.30.

Benchmark suite Current: 7f2358c Previous: 04014e7 Ratio
BenchmarkResolveBooleanValue/test_targetingBoolFlag 18752 ns/op 4817 B/op 80 allocs/op 12543 ns/op 4817 B/op 80 allocs/op 1.50
BenchmarkResolveBooleanValue/test_staticObjectFlag 1593 ns/op 96 B/op 4 allocs/op 1222 ns/op 96 B/op 4 allocs/op 1.30
BenchmarkResolveBooleanValue/test_missingFlag 1921 ns/op 160 B/op 6 allocs/op 1418 ns/op 160 B/op 6 allocs/op 1.35
BenchmarkResolveStringValue/test_targetingStringFlag 17785 ns/op 4841 B/op 82 allocs/op 12218 ns/op 4841 B/op 82 allocs/op 1.46
BenchmarkResolveStringValue/test_missingFlag 1890 ns/op 160 B/op 6 allocs/op 1413 ns/op 160 B/op 6 allocs/op 1.34
BenchmarkResolveIntValue/test_targetingNumberFlag 15088 ns/op 4825 B/op 80 allocs/op 11332 ns/op 4825 B/op 80 allocs/op 1.33
BenchmarkResolveIntValue/test_missingFlag 1938 ns/op 144 B/op 6 allocs/op 1413 ns/op 144 B/op 6 allocs/op 1.37
BenchmarkResolveObjectValue/test_targetingObjectFlag 21681 ns/op 6106 B/op 104 allocs/op 15701 ns/op 6106 B/op 104 allocs/op 1.38
BenchmarkResolveObjectValue/test_missingFlag 1920 ns/op 160 B/op 6 allocs/op 1424 ns/op 160 B/op 6 allocs/op 1.35
BenchmarkConnectService_ResolveObject/happy_path 5581 ns/op 1640 B/op 27 allocs/op 4198 ns/op 1640 B/op 27 allocs/op 1.33

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.