Skip to content

Commit

Permalink
fix: set readiness once only (#465)
Browse files Browse the repository at this point in the history
## This PR

The current readiness state can be changed because of an underlying sync
provider error. This seems to be incorrect as **not ready state** make
flagd to not receive traffic through K8s services [1]

For example, if HTTP endpoint-backed sync provider get an error, the
current logic set readiness to `false`. Similarly, if grpc connectivity
is lost for grpc provider, readiness goes to `false` state till
connection establishment completes.

This PR fix this behavior by setting the ready state **once only** per
sync provider. This means if there is at least a single successful sync,
sync provider sets its readiness to `true`.

Going further, we might need to decide if we need better readiness state
handling.

[1] -
https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-readiness-probes

---------

Signed-off-by: Kavindu Dodanduwa <kavindudodanduwa@gmail.com>
Co-authored-by: Michael Beemer <beeme1mr@users.noreply.github.com>
  • Loading branch information
Kavindu-Dodan and beeme1mr authored Mar 7, 2023
1 parent f5ca5f9 commit 41a888d
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 36 deletions.
20 changes: 19 additions & 1 deletion docs/other_resources/high_level_architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,22 @@ flag configurations watched by the respective implementation. For example, the f
The update provided by sync implementation is pushed to the evaluator engine, which interprets the event and forwards it to the state store. Change notifications generated in the
process gets pushed to event subscribers.

<img src="../images/of-flagd-1.png" width="560">
<img src="../images/of-flagd-1.png" width="560">

## Readiness & Liveness probes

Flagd exposes HTTP liveness and readiness probes. These probes can be used for K8s deployments. With default
start-up configurations, these probes are exposed at the following URLs,

- Liveness: http://localhost:8014/healthz
- Readiness: http://localhost:8014/readyz

### Definition of Liveness

The liveness probe becomes active and HTTP 200 status is served as soon as Flagd service is up and running.

### Definition of Readiness

The readiness probe becomes active similar to the liveness probe as soon as Flagd service is up and running. However,
the probe emits HTTP 412 until all sync providers are ready. This status changes to HTTP 200 when all sync providers at
least have one successful data sync. The status does not change from there on.
1 change: 0 additions & 1 deletion pkg/runtime/from_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ func (r *Runtime) newGRPC(uri string, logger *logger.Logger) *grpc.Sync {
zap.String("component", "sync"),
zap.String("sync", "grpc"),
),
Mux: &msync.RWMutex{},
}
}

Expand Down
20 changes: 13 additions & 7 deletions pkg/sync/file/filepath_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,18 +146,19 @@ func TestSimpleSync(t *testing.T) {

dataSyncChan := make(chan sync.DataSync, len(tt.expectedDataSync))

syncHandler := Sync{
URI: fmt.Sprintf("%s/%s", fetchDirName, fetchFileName),
Logger: logger.NewLogger(nil, false),
Mux: &msync.RWMutex{},
}

go func() {
handler := Sync{
URI: fmt.Sprintf("%s/%s", fetchDirName, fetchFileName),
Logger: logger.NewLogger(nil, false),
Mux: &msync.RWMutex{},
}
err := handler.Init(ctx)
err := syncHandler.Init(ctx)
if err != nil {
log.Fatalf("Error init sync: %s", err.Error())
return
}
err = handler.Sync(ctx, dataSyncChan)
err = syncHandler.Sync(ctx, dataSyncChan)
if err != nil {
log.Fatalf("Error start sync: %s", err.Error())
return
Expand Down Expand Up @@ -187,6 +188,11 @@ func TestSimpleSync(t *testing.T) {
case <-time.After(10 * time.Second):
t.Errorf("event not found, timeout out after 10 seconds")
}

// validate readiness - readiness must not change
if syncHandler.ready != true {
t.Errorf("readiness must be set to true, but found: %t", syncHandler.ready)
}
}
})
}
Expand Down
23 changes: 10 additions & 13 deletions pkg/sync/grpc/grpc_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ const (
constantBackOffDelay = 60
)

var once msync.Once

type Sync struct {
Target string
ProviderID string
Expand All @@ -39,7 +41,6 @@ type Sync struct {
client syncv1grpc.FlagSyncServiceClient
options []grpc.DialOption
ready bool
Mux *msync.RWMutex
}

func (g *Sync) connectClient(ctx context.Context) error {
Expand Down Expand Up @@ -84,37 +85,28 @@ func (g *Sync) Init(ctx context.Context) error {
}

func (g *Sync) IsReady() bool {
g.Mux.RLock()
defer g.Mux.RUnlock()
return g.ready
}

func (g *Sync) setReady(val bool) {
g.Mux.Lock()
defer g.Mux.Unlock()
g.ready = val
}

func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
// initial stream listening
g.setReady(true)
err := g.handleFlagSync(g.syncClient, dataSync)
if err == nil {
return nil
}

g.Logger.Warn(fmt.Sprintf("error with stream listener: %s", err.Error()))

// retry connection establishment
for {
g.setReady(false)
syncClient, ok := g.connectWithRetry(ctx)
if !ok {
// We shall exit
return nil
}
g.setReady(true)

err = g.handleFlagSync(syncClient, dataSync)
if err != nil {
g.setReady(false)
g.Logger.Warn(fmt.Sprintf("error with stream listener: %s", err.Error()))
continue
}
Expand Down Expand Up @@ -168,6 +160,11 @@ func (g *Sync) connectWithRetry(

// handleFlagSync wraps the stream listening and push updates through dataSync channel
func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient, dataSync chan<- sync.DataSync) error {
// Set ready state once only
once.Do(func() {
g.ready = true
})

for {
data, err := stream.Recv()
if err != nil {
Expand Down
3 changes: 0 additions & 3 deletions pkg/sync/grpc/grpc_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"io"
"log"
"net"
msync "sync"
"testing"

"buf.build/gen/go/open-feature/flagd/grpc/go/sync/v1/syncv1grpc"
Expand Down Expand Up @@ -147,7 +146,6 @@ func TestSync_BasicFlagSyncStates(t *testing.T) {
Target: "grpc://test",
ProviderID: "",
Logger: logger.NewLogger(nil, false),
Mux: &msync.RWMutex{},
}

tests := []struct {
Expand Down Expand Up @@ -335,7 +333,6 @@ func Test_StreamListener(t *testing.T) {
Target: target,
ProviderID: "",
Logger: logger.NewLogger(nil, false),
Mux: &msync.RWMutex{},
}

// initialize client
Expand Down
6 changes: 3 additions & 3 deletions pkg/sync/http/http_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ func (hs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
if err != nil {
return err
}

// Set ready state
hs.ready = true

_ = hs.Cron.AddFunc("*/5 * * * *", func() {
Expand Down Expand Up @@ -149,18 +151,16 @@ func (hs *Sync) generateSha(body []byte) string {

func (hs *Sync) Fetch(ctx context.Context) (string, error) {
if hs.URI == "" {
hs.ready = false
return "", errors.New("no HTTP URL string set")
}

body, err := hs.fetchBodyFromURL(ctx, hs.URI)
if err != nil {
hs.ready = false
return "", err
}
if len(body) != 0 {
hs.LastBodySHA = hs.generateSha(body)
}
hs.ready = true

return string(body), nil
}
8 changes: 0 additions & 8 deletions pkg/sync/http/http_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ func TestHTTPSync_Fetch(t *testing.T) {
bearerToken string
lastBodySHA string
handleResponse func(*testing.T, Sync, string, error)
ready bool
}{
"success": {
setup: func(t *testing.T, client *syncmock.MockClient) {
Expand All @@ -84,7 +83,6 @@ func TestHTTPSync_Fetch(t *testing.T) {
t.Errorf("expected fetched to be: '%s', got: '%s'", expected, fetched)
}
},
ready: true,
},
"return an error if no uri": {
setup: func(t *testing.T, client *syncmock.MockClient) {},
Expand All @@ -93,7 +91,6 @@ func TestHTTPSync_Fetch(t *testing.T) {
t.Error("expected err, got nil")
}
},
ready: false,
},
"update last body sha": {
setup: func(t *testing.T, client *syncmock.MockClient) {
Expand All @@ -115,7 +112,6 @@ func TestHTTPSync_Fetch(t *testing.T) {
)
}
},
ready: true,
},
"authorization header": {
setup: func(t *testing.T, client *syncmock.MockClient) {
Expand All @@ -137,7 +133,6 @@ func TestHTTPSync_Fetch(t *testing.T) {
)
}
},
ready: true,
},
}

Expand All @@ -156,9 +151,6 @@ func TestHTTPSync_Fetch(t *testing.T) {
}

fetched, err := httpSync.Fetch(context.Background())
if httpSync.IsReady() != tt.ready {
t.Errorf("expected httpSync.ready to be: '%v', got: '%v'", tt.ready, httpSync.ready)
}
tt.handleResponse(t, httpSync, fetched, err)
})
}
Expand Down

0 comments on commit 41a888d

Please sign in to comment.