Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cluster] Watch follow-ups #3007

Merged
merged 7 commits into from
Dec 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 39 additions & 14 deletions src/cluster/etcd/watchmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package watchmanager
import (
"context"
"fmt"
"math/rand"
"time"

"github.com/uber-go/tally"
Expand Down Expand Up @@ -80,6 +81,7 @@ func (w *manager) watchChanWithTimeout(key string, rev int64) (clientv3.WatchCha
if rev > 0 {
wOpts = append(wOpts, clientv3.WithRev(rev))
}

watchChan = watcher.Watch(
ctx,
key,
Expand All @@ -91,8 +93,14 @@ func (w *manager) watchChanWithTimeout(key string, rev int64) (clientv3.WatchCha
var (
timeout = w.opts.WatchChanInitTimeout()
cancelWatchFn = func() {
// we *must* both cancel the context and call .Close() on watch to
// properly free resources, and not end up with weird issues due to stale
// grpc streams or bad internal etcd watch state.
cancelFn()
if err := watcher.Close(); err != nil {
// however, there's nothing we can do about an error on watch close,
// and it shouldn't happen in practice - unless we end up
// closing an already closed grpc stream or smth.
w.logger.Info("error closing watcher", zap.Error(err))
}
}
Expand All @@ -102,6 +110,7 @@ func (w *manager) watchChanWithTimeout(key string, rev int64) (clientv3.WatchCha
case <-doneCh:
return watchChan, cancelWatchFn, nil
case <-time.After(timeout):
cancelWatchFn()
err := fmt.Errorf("etcd watch create timed out after %s for key: %s", timeout.String(), key)
return nil, cancelWatchFn, err
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it return nil instead cancelWatchFn here since it's already called?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calling nil would cause a panic, and context cancellations are always idempotent in go code.

}
Expand All @@ -111,11 +120,13 @@ func (w *manager) Watch(key string) {
var (
ticker = time.NewTicker(w.opts.WatchChanCheckInterval())
logger = w.logger.With(zap.String("watch_key", key))
rnd = rand.New(rand.NewSource(time.Now().UnixNano())) //nolint:gosec

revOverride int64
watchChan clientv3.WatchChan
cancelFn context.CancelFunc
err error
revOverride int64
firstUpdateSucceeded bool
watchChan clientv3.WatchChan
cancelFn context.CancelFunc
err error
)

defer ticker.Stop()
Expand All @@ -127,7 +138,9 @@ func (w *manager) Watch(key string) {
// set it to nil so it will be recreated
watchChan = nil
// avoid recreating watch channel too frequently
time.Sleep(w.opts.WatchChanResetInterval())
dur := w.opts.WatchChanResetInterval()
dur += time.Duration(rnd.Int63n(int64(dur)))
time.Sleep(dur)
}

for {
Expand All @@ -140,8 +153,14 @@ func (w *manager) Watch(key string) {

// NB(cw) when we failed to create a etcd watch channel
// we do a get for now and will try to recreate the watch chan later
if err = w.updateFn(key, nil); err != nil {
logger.Error("failed to get value for key", zap.Error(err))
if !firstUpdateSucceeded {
if err = w.updateFn(key, nil); err != nil {
logger.Error("failed to get value for key", zap.Error(err))
} else {
// NB(vytenis): only try initializing once, otherwise there's
// get request amplification, especially for non-existent keys.
firstUpdateSucceeded = true
}
}
resetWatchWithSleep()
continue
Expand All @@ -166,20 +185,26 @@ func (w *manager) Watch(key string) {
zap.Error(err),
)
w.m.etcdWatchError.Inc(1)
// do not stop here, even though the update contains an error
// we still take this chance to attempt a Get() for the latest value

// If the current revision has been compacted, set watchChan to
// nil so the watch is recreated with a valid start revision
if err == rpctypes.ErrCompacted {
logger.Warn("recreating watch at revision", zap.Int64("revision", r.CompactRevision))
revOverride = r.CompactRevision
logger.Warn("compacted; recreating watch at revision",
zap.Int64("revision", revOverride))
} else {
logger.Warn("recreating watch due to an error")
logger.Warn("recreating watch due to an error", zap.Error(err))
}

resetWatchWithSleep()
continue
} else if r.IsProgressNotify() {
if r.CompactRevision > revOverride {
// we only care about last event as this watchmanager implementation does not support
// watching key ranges, only single keys.
// set revOverride to minimum non-compacted revision if watch was
// initialized with an older rev., since we really don't care about history.
// this may help recover faster (one less retry) on connection loss/leader change
// around compaction, if we were watching on a revision that's already compacted.
revOverride = r.CompactRevision
}
// Do not call updateFn on ProgressNotify as it happens periodically with no update events
continue
}
Expand Down
55 changes: 22 additions & 33 deletions src/cluster/etcd/watchmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,22 +119,22 @@ func TestWatchRecreate(t *testing.T) {

ec := ecluster.RandClient()

failTotal := 2
failTotal := 1
wh.opts = wh.opts.
SetClient(ec).
SetWatchChanInitTimeout(200 * time.Millisecond).
SetWatchChanResetInterval(100 * time.Millisecond)
SetWatchChanInitTimeout(50 * time.Millisecond).
SetWatchChanResetInterval(50 * time.Millisecond)

go func() {
ecluster.Members[0].DropConnections()
ecluster.Members[0].Blackhole()
wh.Watch("foo")
}()

time.Sleep(2 * wh.opts.WatchChanInitTimeout())
time.Sleep(4 * wh.opts.WatchChanInitTimeout())

// watch will error out but updateFn will be tried
for {
for i := 0; i < 100; i++ {
if atomic.LoadInt32(updateCalled) >= int32(failTotal) {
break
}
Expand All @@ -150,7 +150,7 @@ func TestWatchRecreate(t *testing.T) {
_, err := ec.Put(context.Background(), "foo", "v")
require.NoError(t, err)

for {
for i := 0; i < 100; i++ {
if atomic.LoadInt32(updateCalled) > updatesBefore {
break
}
Expand Down Expand Up @@ -223,37 +223,13 @@ func TestWatchNoLeader(t *testing.T) {
leaderIdx := ecluster.WaitLeader(t)
require.True(t, leaderIdx >= 0 && leaderIdx < len(ecluster.Members), "got invalid leader")

for i := 0; i < 10; i++ {
if atomic.LoadInt32(&updateCalled) == int32(3) {
break
}
time.Sleep(watchInitAndRetryDelay)
}

// simulate quorum loss
ecluster.Members[1].Stop(t)
ecluster.Members[2].Stop(t)

// wait for election timeout, then member[0] will not have a leader.
time.Sleep(electionTimeout)

for i := 0; i < 100; i++ {
// test that leader loss is retried - even on error, we should attempt update.
// 5 is an arbitraty number greater than amount of actual updates
if atomic.LoadInt32(&updateCalled) >= 10 {
break
}
time.Sleep(watchInitAndRetryDelay)
}

updates := atomic.LoadInt32(&updateCalled)
if updates < 10 {
require.Fail(t,
"insufficient update calls",
"expected at least 10 update attempts, got %d during a partition",
updates)
}

require.NoError(t, ecluster.Members[1].Restart(t))
require.NoError(t, ecluster.Members[2].Restart(t))
// wait for leader + election delay just in case
Expand All @@ -266,8 +242,21 @@ func TestWatchNoLeader(t *testing.T) {
require.NoError(t, err)

// give some time for watch to be updated
runtime.Gosched()
time.Sleep(watchInitAndRetryDelay)
for i := 0; i < 10; i++ {
if atomic.LoadInt32(&updateCalled) == int32(2) {
break
}
time.Sleep(watchInitAndRetryDelay)
runtime.Gosched()
}

updates := atomic.LoadInt32(&updateCalled)
if updates < 2 {
require.Fail(t,
"insufficient update calls",
"expected at least 2 update attempts, got %d during a partition",
updates)
}

atomic.AddInt32(&shouldStop, 1)
<-doneCh
Expand Down Expand Up @@ -308,7 +297,7 @@ func TestWatchCompactedRevision(t *testing.T) {
go wh.Watch("foo")
time.Sleep(3 * wh.opts.WatchChanInitTimeout())

assert.Equal(t, int32(4), atomic.LoadInt32(updateCalled))
assert.Equal(t, int32(3), atomic.LoadInt32(updateCalled))

lastRead := atomic.LoadInt32(updateCalled)
ec.Put(context.Background(), "foo", "bar-11")
Expand Down
43 changes: 25 additions & 18 deletions src/cluster/kv/etcd/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"io/ioutil"
"os"
"path"
"sync/atomic"
"testing"
"time"

Expand All @@ -37,6 +36,7 @@ import (
"github.com/m3db/m3/src/x/retry"

"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/integration"
Expand Down Expand Up @@ -318,27 +318,35 @@ func TestWatchLastVersion(t *testing.T) {
require.NoError(t, err)
require.Nil(t, w.Get())

var errs int32
lastVersion := 50
var (
doneCh = make(chan struct{})
lastVersion = 50
)

go func() {
for i := 1; i <= lastVersion; i++ {
_, err := store.Set("foo", genProto(fmt.Sprintf("bar%d", i)))
if err != nil {
atomic.AddInt32(&errs, 1)
}
assert.NoError(t, err)
}
}()

for {
<-w.C()
value := w.Get()
if value.Version() == lastVersion-int(atomic.LoadInt32(&errs)) {
break
go func() {
defer close(doneCh)
for {
<-w.C()
value := w.Get()
if value.Version() == lastVersion {
return
}
}
}()

select {
case <-time.After(5 * time.Second):
t.Fatal("test timed out")
case <-doneCh:
}
verifyValue(t, w.Get(), fmt.Sprintf("bar%d", lastVersion), lastVersion)

w.Close()
}

func TestWatchFromExist(t *testing.T) {
Expand Down Expand Up @@ -877,7 +885,6 @@ func TestStaleDelete__FromWatch(t *testing.T) {
// in this test we ensure clients who did not receive a delete for a key in
// their caches, evict the value in their cache the next time they communicate
// with an etcd which is unaware of the key (e.g. it's been compacted).

// first, we find the bytes required to be created in the cache file
serverCachePath, err := ioutil.TempDir("", "server-cache-dir")
require.NoError(t, err)
Expand Down Expand Up @@ -1156,10 +1163,10 @@ func testCluster(t *testing.T) (*integration.ClusterV3, Options, func()) {
}

opts := NewOptions().
SetWatchChanCheckInterval(50 * time.Millisecond).
SetWatchChanResetInterval(150 * time.Millisecond).
SetWatchChanInitTimeout(150 * time.Millisecond).
SetRequestTimeout(100 * time.Millisecond).
SetWatchChanCheckInterval(100 * time.Millisecond).
SetWatchChanResetInterval(200 * time.Millisecond).
SetWatchChanInitTimeout(200 * time.Millisecond).
SetRequestTimeout(200 * time.Millisecond).
SetRetryOptions(retry.NewOptions().SetMaxRetries(1).SetMaxBackoff(0)).
SetPrefix("test")

Expand Down