Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Health and Readiness probes #418

Merged
merged 15 commits into from
Feb 27, 2023
6 changes: 3 additions & 3 deletions config/samples/example_flags.flagd.json
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@
},
"$evaluators": {
"emailWithFaas": {
"in": ["@faas.com", {
"var": ["email"]
}]
"in": ["@faas.com", {
"var": ["email"]
}]
toddbaert marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
103 changes: 67 additions & 36 deletions pkg/runtime/from_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"net/http"
"regexp"
msync "sync"
"time"

"github.com/open-feature/flagd/pkg/eval"
Expand Down Expand Up @@ -67,52 +68,82 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error {
for _, uri := range r.config.SyncURI {
switch uriB := []byte(uri); {
case regFile.Match(uriB):
r.SyncImpl = append(r.SyncImpl, &file.Sync{
URI: regFile.ReplaceAllString(uri, ""),
Logger: logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "filepath"),
),
ProviderArgs: r.config.ProviderArgs,
})
r.SyncImpl = append(
r.SyncImpl,
r.newFile(uri, logger),
)
rtLogger.Debug(fmt.Sprintf("using filepath sync-provider for: %q", uri))
case regCrd.Match(uriB):
r.SyncImpl = append(r.SyncImpl, &kubernetes.Sync{
Logger: logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "kubernetes"),
),
URI: regCrd.ReplaceAllString(uri, ""),
ProviderArgs: r.config.ProviderArgs,
})
r.SyncImpl = append(
r.SyncImpl,
r.newK8s(uri, logger),
)
rtLogger.Debug(fmt.Sprintf("using kubernetes sync-provider for: %s", uri))
case regURL.Match(uriB):
r.SyncImpl = append(r.SyncImpl, &httpSync.Sync{
URI: uri,
BearerToken: r.config.SyncBearerToken,
Client: &http.Client{
Timeout: time.Second * 10,
},
Logger: logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "remote"),
),
ProviderArgs: r.config.ProviderArgs,
Cron: cron.New(),
})
r.SyncImpl = append(
r.SyncImpl,
r.newHTTP(uri, logger),
)
toddbaert marked this conversation as resolved.
Show resolved Hide resolved
rtLogger.Debug(fmt.Sprintf("using remote sync-provider for: %q", uri))
case regGRPC.Match(uriB):
r.SyncImpl = append(r.SyncImpl, &grpc.Sync{
Target: grpc.URLToGRPCTarget(uri),
Logger: logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "grpc"),
),
})
r.SyncImpl = append(
r.SyncImpl,
r.newGRPC(uri, logger),
)
default:
return fmt.Errorf("invalid sync uri argument: %s, must start with 'file:', 'http(s)://', 'grpc://',"+
" or 'core.openfeature.dev'", uri)
}
}
return nil
}

func (r *Runtime) newGRPC(uri string, logger *logger.Logger) *grpc.Sync {
return &grpc.Sync{
Target: grpc.URLToGRPCTarget(uri),
Logger: logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "grpc"),
),
Mux: &msync.RWMutex{},
}
}

func (r *Runtime) newHTTP(uri string, logger *logger.Logger) *httpSync.Sync {
return &httpSync.Sync{
URI: uri,
BearerToken: r.config.SyncBearerToken,
Client: &http.Client{
Timeout: time.Second * 10,
},
Logger: logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "remote"),
),
ProviderArgs: r.config.ProviderArgs,
Cron: cron.New(),
}
}

func (r *Runtime) newK8s(uri string, logger *logger.Logger) *kubernetes.Sync {
return &kubernetes.Sync{
Logger: logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "kubernetes"),
),
URI: regCrd.ReplaceAllString(uri, ""),
ProviderArgs: r.config.ProviderArgs,
}
}

func (r *Runtime) newFile(uri string, logger *logger.Logger) *file.Sync {
return &file.Sync{
URI: regFile.ReplaceAllString(uri, ""),
Logger: logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "filepath"),
),
ProviderArgs: r.config.ProviderArgs,
Mux: &msync.RWMutex{},
}
}
23 changes: 21 additions & 2 deletions pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,14 @@ func (r *Runtime) Start() error {
}
})

// Start sync providers
// Init sync providers
for _, s := range r.SyncImpl {
if err := s.Init(gCtx); err != nil {
return err
}
}

// Start sync provider
for _, s := range r.SyncImpl {
p := s
g.Go(func() error {
Expand All @@ -79,7 +86,9 @@ func (r *Runtime) Start() error {
}

g.Go(func() error {
return r.Service.Serve(gCtx, r.Evaluator)
return r.Service.Serve(gCtx, r.Evaluator, service.Configuration{
ReadinessProbe: r.isReady,
})
})

<-gCtx.Done()
Expand All @@ -89,6 +98,16 @@ func (r *Runtime) Start() error {
return nil
}

func (r *Runtime) isReady() bool {
// if all providers can watch for flag changes, we are ready.
for _, p := range r.SyncImpl {
if !p.IsReady() {
toddbaert marked this conversation as resolved.
Show resolved Hide resolved
return false
}
}
return true
}

// updateWithNotify helps to update state and notify listeners
func (r *Runtime) updateWithNotify(payload sync.DataSync) {
r.mu.Lock()
Expand Down
21 changes: 15 additions & 6 deletions pkg/service/connect_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ type eventingConfiguration struct {
subs map[interface{}]chan Notification
}

func (s *ConnectService) Serve(ctx context.Context, eval eval.IEvaluator) error {
func (s *ConnectService) Serve(ctx context.Context, eval eval.IEvaluator, svcConf Configuration) error {
s.Eval = eval
s.eventingConfiguration = &eventingConfiguration{
subs: make(map[interface{}]chan Notification),
mu: &sync.RWMutex{},
}
lis, err := s.setupServer()
lis, err := s.setupServer(svcConf)
if err != nil {
return err
}
Expand Down Expand Up @@ -87,7 +87,7 @@ func (s *ConnectService) Serve(ctx context.Context, eval eval.IEvaluator) error
}
}

func (s *ConnectService) setupServer() (net.Listener, error) {
func (s *ConnectService) setupServer(svcConf Configuration) (net.Listener, error) {
var lis net.Listener
var err error
mux := http.NewServeMux()
Expand All @@ -107,15 +107,24 @@ func (s *ConnectService) setupServer() (net.Listener, error) {
})
h := Handler("", mdlw, mux)
go func() {
s.Logger.Info(fmt.Sprintf("metrics listening at %d", s.ConnectServiceConfiguration.MetricsPort))
s.Logger.Info(fmt.Sprintf("metrics and probes listening at %d", s.ConnectServiceConfiguration.MetricsPort))
server := &http.Server{
Addr: fmt.Sprintf(":%d", s.ConnectServiceConfiguration.MetricsPort),
ReadHeaderTimeout: 3 * time.Second,
}
server.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/metrics" {
switch r.URL.Path {
case "/healthz":
w.WriteHeader(http.StatusOK)
case "/readyz":
if svcConf.ReadinessProbe() {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusPreconditionFailed)
}
case "/metrics":
promhttp.Handler().ServeHTTP(w, r)
} else {
default:
w.WriteHeader(http.StatusNotFound)
}
})
Expand Down
9 changes: 7 additions & 2 deletions pkg/service/connect_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,23 @@ func TestConnectService_UnixConnection(t *testing.T) {
tt.evalFields.reason,
tt.evalFields.err,
).AnyTimes()
service := service.ConnectService{
svc := service.ConnectService{
ConnectServiceConfiguration: &service.ConnectServiceConfiguration{
ServerSocketPath: tt.socketPath,
},
Logger: logger.NewLogger(nil, false),
}
serveConf := service.Configuration{
ReadinessProbe: func() bool {
return true
},
}
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

go func() {
err := service.Serve(ctx, eval)
err := svc.Serve(ctx, eval, serveConf)
fmt.Println(err)
}()
conn, err := grpc.Dial(
Expand Down
8 changes: 6 additions & 2 deletions pkg/service/iservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ type Notification struct {
Data map[string]interface{} `json:"data"`
}

type IServiceConfiguration interface{}
type ReadinessProbe func() bool

type Configuration struct {
ReadinessProbe ReadinessProbe
}

/*
IService implementations define handlers for a particular transport, which call the IEvaluator implementation.
*/
type IService interface {
Serve(ctx context.Context, eval eval.IEvaluator) error
Serve(ctx context.Context, eval eval.IEvaluator, svcConf Configuration) error
Notify(n Notification)
}
43 changes: 31 additions & 12 deletions pkg/sync/file/filepath_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import (
"fmt"
"os"
"strings"

"github.com/open-feature/flagd/pkg/sync"
msync "sync"

"gopkg.in/yaml.v3"

"github.com/fsnotify/fsnotify"
"github.com/open-feature/flagd/pkg/logger"
"github.com/open-feature/flagd/pkg/sync"
)

type Sync struct {
Expand All @@ -22,31 +22,49 @@ type Sync struct {
ProviderArgs sync.ProviderArgs
// FileType indicates the file type e.g., json, yaml/yml etc.,
fileType string
watcher *fsnotify.Watcher
ready bool
Mux *msync.RWMutex
}

// default state is used to prevent EOF errors when handling filepath delete events + empty files
const defaultState = "{}"

//nolint:funlen
func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
func (fs *Sync) Init(ctx context.Context) error {
fs.Logger.Info("Starting filepath sync notifier")
watcher, err := fsnotify.NewWatcher()
w, err := fsnotify.NewWatcher()
if err != nil {
return err
}
defer watcher.Close()

err = watcher.Add(fs.URI)
fs.watcher = w
err = fs.watcher.Add(fs.URI)
if err != nil {
return err
}
return nil
}

fs.sendDataSync(ctx, sync.ALL, dataSync)
func (fs *Sync) IsReady() bool {
fs.Mux.RLock()
skyerus marked this conversation as resolved.
Show resolved Hide resolved
defer fs.Mux.RUnlock()
return fs.ready
}

func (fs *Sync) setReady(val bool) {
fs.Mux.Lock()
defer fs.Mux.Unlock()
fs.ready = val
}

//nolint:funlen
func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
defer fs.watcher.Close()
fs.sendDataSync(ctx, sync.ALL, dataSync)
fs.setReady(true)
fs.Logger.Info(fmt.Sprintf("watching filepath: %s", fs.URI))
for {
select {
case event, ok := <-watcher.Events:
case event, ok := <-fs.watcher.Events:
if !ok {
fs.Logger.Info("filepath notifier closed")
return errors.New("filepath notifier closed")
Expand All @@ -59,7 +77,7 @@ func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
case event.Has(fsnotify.Remove):
// K8s exposes config maps as symlinks.
// Updates cause a remove event, we need to re-add the watcher in this case.
err = watcher.Add(fs.URI)
err := fs.watcher.Add(fs.URI)
if err != nil {
// the watcher could not be re-added, so the file must have been deleted
fs.Logger.Error(fmt.Sprintf("error restoring watcher, file may have been deleted: %s", err.Error()))
Expand All @@ -80,8 +98,9 @@ func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
}
}

case err, ok := <-watcher.Errors:
case err, ok := <-fs.watcher.Errors:
if !ok {
fs.setReady(false)
return errors.New("watcher error")
}

Expand Down
Loading