From faa15427d5f28e8d1032c7eaa4c9af465ec6e4d2 Mon Sep 17 00:00:00 2001 From: Vytenis Darulis Date: Mon, 14 Dec 2020 01:19:30 -0500 Subject: [PATCH 1/6] [cluster] Watch follow-ups --- src/cluster/etcd/watchmanager/manager.go | 53 +++++++++++++++++------- src/cluster/kv/etcd/store_test.go | 46 ++++++++++++-------- 2 files changed, 68 insertions(+), 31 deletions(-) diff --git a/src/cluster/etcd/watchmanager/manager.go b/src/cluster/etcd/watchmanager/manager.go index 02fefd320e..d837328370 100644 --- a/src/cluster/etcd/watchmanager/manager.go +++ b/src/cluster/etcd/watchmanager/manager.go @@ -23,6 +23,7 @@ package watchmanager import ( "context" "fmt" + "math/rand" "time" "github.com/uber-go/tally" @@ -80,6 +81,7 @@ func (w *manager) watchChanWithTimeout(key string, rev int64) (clientv3.WatchCha if rev > 0 { wOpts = append(wOpts, clientv3.WithRev(rev)) } + watchChan = watcher.Watch( ctx, key, @@ -91,8 +93,14 @@ func (w *manager) watchChanWithTimeout(key string, rev int64) (clientv3.WatchCha var ( timeout = w.opts.WatchChanInitTimeout() cancelWatchFn = func() { + // we *must* both cancel the context and call .Close() on watch to + // properly free resources, and not end up with weird issues due to stale + // grpc streams or bad internal etcd watch state. cancelFn() if err := watcher.Close(); err != nil { + // however, there's nothing we can do about an error on watch close, + // and it shouldn't happen in practice - unless we end up + // closing an already closed grpc stream or smth. w.logger.Info("error closing watcher", zap.Error(err)) } } @@ -102,6 +110,7 @@ func (w *manager) watchChanWithTimeout(key string, rev int64) (clientv3.WatchCha case <-doneCh: return watchChan, cancelWatchFn, nil case <-time.After(timeout): + cancelWatchFn() err := fmt.Errorf("etcd watch create timed out after %s for key: %s", timeout.String(), key) return nil, cancelWatchFn, err } @@ -111,11 +120,13 @@ func (w *manager) Watch(key string) { var ( ticker = time.NewTicker(w.opts.WatchChanCheckInterval()) logger = w.logger.With(zap.String("watch_key", key)) + rnd = rand.New(rand.NewSource(time.Now().UnixNano())) //nolint:gosec - revOverride int64 - watchChan clientv3.WatchChan - cancelFn context.CancelFunc - err error + revOverride int64 + watchInitialized bool + watchChan clientv3.WatchChan + cancelFn context.CancelFunc + err error ) defer ticker.Stop() @@ -127,7 +138,9 @@ func (w *manager) Watch(key string) { // set it to nil so it will be recreated watchChan = nil // avoid recreating watch channel too frequently - time.Sleep(w.opts.WatchChanResetInterval()) + dur := w.opts.WatchChanResetInterval() + dur += time.Duration(rnd.Int63n(int64(dur))) + time.Sleep(dur) } for { @@ -140,8 +153,14 @@ func (w *manager) Watch(key string) { // NB(cw) when we failed to create a etcd watch channel // we do a get for now and will try to recreate the watch chan later - if err = w.updateFn(key, nil); err != nil { - logger.Error("failed to get value for key", zap.Error(err)) + if !watchInitialized { + if err = w.updateFn(key, nil); err != nil { + logger.Error("failed to get value for key", zap.Error(err)) + } else { + // NB(vytenis): only try initializing once, otherwise there's + // get request amplification, especially for non-existent keys. + watchInitialized = true + } } resetWatchWithSleep() continue @@ -166,20 +185,26 @@ func (w *manager) Watch(key string) { zap.Error(err), ) w.m.etcdWatchError.Inc(1) - // do not stop here, even though the update contains an error - // we still take this chance to attempt a Get() for the latest value - - // If the current revision has been compacted, set watchChan to - // nil so the watch is recreated with a valid start revision if err == rpctypes.ErrCompacted { - logger.Warn("recreating watch at revision", zap.Int64("revision", r.CompactRevision)) revOverride = r.CompactRevision + logger.Warn("compacted; recreating watch at revision", + zap.Int64("revision", revOverride)) } else { - logger.Warn("recreating watch due to an error") + logger.Warn("recreating watch due to an error", zap.Error(err)) } resetWatchWithSleep() + continue } else if r.IsProgressNotify() { + if r.CompactRevision > revOverride { + // we only care about last event as this watchmanager implementation does not support + // watching key ranges, only single keys. + // set revOverride to minimum non-compacted revision if watch was + // initialized with an older rev., since we really don't care about history. + // this may help recover faster (one less retry) on connection loss/leader change + // around compaction, if we were watching on a revision that's already compacted. + revOverride = r.CompactRevision + } // Do not call updateFn on ProgressNotify as it happens periodically with no update events continue } diff --git a/src/cluster/kv/etcd/store_test.go b/src/cluster/kv/etcd/store_test.go index 8924095d3f..f0f31201c9 100644 --- a/src/cluster/kv/etcd/store_test.go +++ b/src/cluster/kv/etcd/store_test.go @@ -27,10 +27,11 @@ import ( "io/ioutil" "os" "path" - "sync/atomic" "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/m3db/m3/src/cluster/generated/proto/kvtest" "github.com/m3db/m3/src/cluster/kv" xclock "github.com/m3db/m3/src/x/clock" @@ -317,28 +318,37 @@ func TestWatchLastVersion(t *testing.T) { w, err := store.Watch("foo") require.NoError(t, err) require.Nil(t, w.Get()) + defer w.Close() + + var ( + doneCh = make(chan struct{}) + lastVersion = 50 + ) - var errs int32 - lastVersion := 50 go func() { for i := 1; i <= lastVersion; i++ { _, err := store.Set("foo", genProto(fmt.Sprintf("bar%d", i))) - if err != nil { - atomic.AddInt32(&errs, 1) - } + assert.NoError(t, err) } }() - for { - <-w.C() - value := w.Get() - if value.Version() == lastVersion-int(atomic.LoadInt32(&errs)) { - break + go func() { + defer close(doneCh) + for { + <-w.C() + value := w.Get() + if value.Version() == lastVersion { + return + } } + }() + + select { + case <-time.After(5 * time.Second): + t.Fatal("test timed out") + case <-doneCh: } verifyValue(t, w.Get(), fmt.Sprintf("bar%d", lastVersion), lastVersion) - - w.Close() } func TestWatchFromExist(t *testing.T) { @@ -800,6 +810,7 @@ func TestStaleDelete__FromGet(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(serverCachePath) ec, opts, closeFn := testStore(t) + defer closeFn() setStore, err := NewStore(ec, opts.SetCacheFileFn(func(ns string) string { return path.Join(serverCachePath, fmt.Sprintf("%s.json", ns)) @@ -883,6 +894,7 @@ func TestStaleDelete__FromWatch(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(serverCachePath) ec, opts, closeFn := testStore(t) + defer closeFn() setStore, err := NewStore(ec, opts.SetCacheFileFn(func(ns string) string { return path.Join(serverCachePath, fmt.Sprintf("%s.json", ns)) @@ -1156,10 +1168,10 @@ func testCluster(t *testing.T) (*integration.ClusterV3, Options, func()) { } opts := NewOptions(). - SetWatchChanCheckInterval(50 * time.Millisecond). - SetWatchChanResetInterval(150 * time.Millisecond). - SetWatchChanInitTimeout(150 * time.Millisecond). - SetRequestTimeout(100 * time.Millisecond). + SetWatchChanCheckInterval(100 * time.Millisecond). + SetWatchChanResetInterval(200 * time.Millisecond). + SetWatchChanInitTimeout(200 * time.Millisecond). + SetRequestTimeout(200 * time.Millisecond). SetRetryOptions(retry.NewOptions().SetMaxRetries(1).SetMaxBackoff(0)). SetPrefix("test") From 6f915ba5b37f5b154090db9cad8f61358cf937a0 Mon Sep 17 00:00:00 2001 From: Vytenis Darulis Date: Mon, 14 Dec 2020 15:10:04 -0500 Subject: [PATCH 2/6] fix tests --- src/cluster/kv/etcd/store_test.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/cluster/kv/etcd/store_test.go b/src/cluster/kv/etcd/store_test.go index f0f31201c9..bbba9f9966 100644 --- a/src/cluster/kv/etcd/store_test.go +++ b/src/cluster/kv/etcd/store_test.go @@ -30,14 +30,13 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" - "github.com/m3db/m3/src/cluster/generated/proto/kvtest" "github.com/m3db/m3/src/cluster/kv" xclock "github.com/m3db/m3/src/x/clock" "github.com/m3db/m3/src/x/retry" "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/integration" @@ -318,7 +317,6 @@ func TestWatchLastVersion(t *testing.T) { w, err := store.Watch("foo") require.NoError(t, err) require.Nil(t, w.Get()) - defer w.Close() var ( doneCh = make(chan struct{}) @@ -810,7 +808,6 @@ func TestStaleDelete__FromGet(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(serverCachePath) ec, opts, closeFn := testStore(t) - defer closeFn() setStore, err := NewStore(ec, opts.SetCacheFileFn(func(ns string) string { return path.Join(serverCachePath, fmt.Sprintf("%s.json", ns)) @@ -888,13 +885,11 @@ func TestStaleDelete__FromWatch(t *testing.T) { // in this test we ensure clients who did not receive a delete for a key in // their caches, evict the value in their cache the next time they communicate // with an etcd which is unaware of the key (e.g. it's been compacted). - // first, we find the bytes required to be created in the cache file serverCachePath, err := ioutil.TempDir("", "server-cache-dir") require.NoError(t, err) defer os.RemoveAll(serverCachePath) ec, opts, closeFn := testStore(t) - defer closeFn() setStore, err := NewStore(ec, opts.SetCacheFileFn(func(ns string) string { return path.Join(serverCachePath, fmt.Sprintf("%s.json", ns)) From 9cf245975c5931fb710cb716ebfe6ad01b9b112f Mon Sep 17 00:00:00 2001 From: Vytenis Darulis Date: Mon, 14 Dec 2020 15:33:17 -0500 Subject: [PATCH 3/6] tests pt2 --- src/cluster/etcd/watchmanager/manager_test.go | 29 +++++++------------ 1 file changed, 10 insertions(+), 19 deletions(-) diff --git a/src/cluster/etcd/watchmanager/manager_test.go b/src/cluster/etcd/watchmanager/manager_test.go index ad1eda0d33..3bfc99ce47 100644 --- a/src/cluster/etcd/watchmanager/manager_test.go +++ b/src/cluster/etcd/watchmanager/manager_test.go @@ -224,7 +224,7 @@ func TestWatchNoLeader(t *testing.T) { require.True(t, leaderIdx >= 0 && leaderIdx < len(ecluster.Members), "got invalid leader") for i := 0; i < 10; i++ { - if atomic.LoadInt32(&updateCalled) == int32(3) { + if atomic.LoadInt32(&updateCalled) == int32(1) { break } time.Sleep(watchInitAndRetryDelay) @@ -237,23 +237,6 @@ func TestWatchNoLeader(t *testing.T) { // wait for election timeout, then member[0] will not have a leader. time.Sleep(electionTimeout) - for i := 0; i < 100; i++ { - // test that leader loss is retried - even on error, we should attempt update. - // 5 is an arbitraty number greater than amount of actual updates - if atomic.LoadInt32(&updateCalled) >= 10 { - break - } - time.Sleep(watchInitAndRetryDelay) - } - - updates := atomic.LoadInt32(&updateCalled) - if updates < 10 { - require.Fail(t, - "insufficient update calls", - "expected at least 10 update attempts, got %d during a partition", - updates) - } - require.NoError(t, ecluster.Members[1].Restart(t)) require.NoError(t, ecluster.Members[2].Restart(t)) // wait for leader + election delay just in case @@ -269,6 +252,14 @@ func TestWatchNoLeader(t *testing.T) { runtime.Gosched() time.Sleep(watchInitAndRetryDelay) + updates := atomic.LoadInt32(&updateCalled) + if updates < 2 { + require.Fail(t, + "insufficient update calls", + "expected at least 2 update attempts, got %d during a partition", + updates) + } + atomic.AddInt32(&shouldStop, 1) <-doneCh @@ -308,7 +299,7 @@ func TestWatchCompactedRevision(t *testing.T) { go wh.Watch("foo") time.Sleep(3 * wh.opts.WatchChanInitTimeout()) - assert.Equal(t, int32(4), atomic.LoadInt32(updateCalled)) + assert.Equal(t, int32(3), atomic.LoadInt32(updateCalled)) lastRead := atomic.LoadInt32(updateCalled) ec.Put(context.Background(), "foo", "bar-11") From 7b9f805a6f00a2ad1e9cc79eb17b1c40a17a5670 Mon Sep 17 00:00:00 2001 From: Vytenis Darulis Date: Mon, 14 Dec 2020 15:45:29 -0500 Subject: [PATCH 4/6] . --- src/cluster/etcd/watchmanager/manager_test.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/src/cluster/etcd/watchmanager/manager_test.go b/src/cluster/etcd/watchmanager/manager_test.go index 3bfc99ce47..b9cbbe6e83 100644 --- a/src/cluster/etcd/watchmanager/manager_test.go +++ b/src/cluster/etcd/watchmanager/manager_test.go @@ -223,13 +223,6 @@ func TestWatchNoLeader(t *testing.T) { leaderIdx := ecluster.WaitLeader(t) require.True(t, leaderIdx >= 0 && leaderIdx < len(ecluster.Members), "got invalid leader") - for i := 0; i < 10; i++ { - if atomic.LoadInt32(&updateCalled) == int32(1) { - break - } - time.Sleep(watchInitAndRetryDelay) - } - // simulate quorum loss ecluster.Members[1].Stop(t) ecluster.Members[2].Stop(t) @@ -249,8 +242,13 @@ func TestWatchNoLeader(t *testing.T) { require.NoError(t, err) // give some time for watch to be updated - runtime.Gosched() - time.Sleep(watchInitAndRetryDelay) + for i := 0; i < 10; i++ { + if atomic.LoadInt32(&updateCalled) == int32(2) { + break + } + time.Sleep(watchInitAndRetryDelay) + runtime.Gosched() + } updates := atomic.LoadInt32(&updateCalled) if updates < 2 { From c4a73847310bbf64995bf274a1e56853d11b9c23 Mon Sep 17 00:00:00 2001 From: Vytenis Darulis Date: Mon, 14 Dec 2020 16:12:38 -0500 Subject: [PATCH 5/6] fix test --- src/cluster/etcd/watchmanager/manager_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/cluster/etcd/watchmanager/manager_test.go b/src/cluster/etcd/watchmanager/manager_test.go index b9cbbe6e83..d3720132eb 100644 --- a/src/cluster/etcd/watchmanager/manager_test.go +++ b/src/cluster/etcd/watchmanager/manager_test.go @@ -119,11 +119,11 @@ func TestWatchRecreate(t *testing.T) { ec := ecluster.RandClient() - failTotal := 2 + failTotal := 1 wh.opts = wh.opts. SetClient(ec). - SetWatchChanInitTimeout(200 * time.Millisecond). - SetWatchChanResetInterval(100 * time.Millisecond) + SetWatchChanInitTimeout(50 * time.Millisecond). + SetWatchChanResetInterval(50 * time.Millisecond) go func() { ecluster.Members[0].DropConnections() @@ -131,10 +131,10 @@ func TestWatchRecreate(t *testing.T) { wh.Watch("foo") }() - time.Sleep(2 * wh.opts.WatchChanInitTimeout()) + time.Sleep(4 * wh.opts.WatchChanInitTimeout()) // watch will error out but updateFn will be tried - for { + for i := 0; i < 100; i++ { if atomic.LoadInt32(updateCalled) >= int32(failTotal) { break } @@ -150,7 +150,7 @@ func TestWatchRecreate(t *testing.T) { _, err := ec.Put(context.Background(), "foo", "v") require.NoError(t, err) - for { + for i := 0; i < 100; i++ { if atomic.LoadInt32(updateCalled) > updatesBefore { break } From 654274e6d75acdc13ac4775727484d708e8f4ee4 Mon Sep 17 00:00:00 2001 From: Vytenis Darulis Date: Mon, 14 Dec 2020 16:54:23 -0500 Subject: [PATCH 6/6] naming is hard --- src/cluster/etcd/watchmanager/manager.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/cluster/etcd/watchmanager/manager.go b/src/cluster/etcd/watchmanager/manager.go index d837328370..ccdcf31066 100644 --- a/src/cluster/etcd/watchmanager/manager.go +++ b/src/cluster/etcd/watchmanager/manager.go @@ -122,11 +122,11 @@ func (w *manager) Watch(key string) { logger = w.logger.With(zap.String("watch_key", key)) rnd = rand.New(rand.NewSource(time.Now().UnixNano())) //nolint:gosec - revOverride int64 - watchInitialized bool - watchChan clientv3.WatchChan - cancelFn context.CancelFunc - err error + revOverride int64 + firstUpdateSucceeded bool + watchChan clientv3.WatchChan + cancelFn context.CancelFunc + err error ) defer ticker.Stop() @@ -153,13 +153,13 @@ func (w *manager) Watch(key string) { // NB(cw) when we failed to create a etcd watch channel // we do a get for now and will try to recreate the watch chan later - if !watchInitialized { + if !firstUpdateSucceeded { if err = w.updateFn(key, nil); err != nil { logger.Error("failed to get value for key", zap.Error(err)) } else { // NB(vytenis): only try initializing once, otherwise there's // get request amplification, especially for non-existent keys. - watchInitialized = true + firstUpdateSucceeded = true } } resetWatchWithSleep()