From faf64a2e52d105fd5230b207b394809cd4238853 Mon Sep 17 00:00:00 2001 From: Aditya Dani Date: Wed, 14 Jun 2017 22:02:38 +0100 Subject: [PATCH 1/2] Handle etcd connection failures in etcd v3 watch API. --- etcd/v3/kv_etcd.go | 94 +++++++++++++++++++++++++++++----------------- 1 file changed, 59 insertions(+), 35 deletions(-) diff --git a/etcd/v3/kv_etcd.go b/etcd/v3/kv_etcd.go index a90d87af..dc2b0450 100644 --- a/etcd/v3/kv_etcd.go +++ b/etcd/v3/kv_etcd.go @@ -15,6 +15,7 @@ import ( "golang.org/x/net/context" e "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/clientv3/concurrency" "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" "github.com/coreos/etcd/mvcc/mvccpb" @@ -29,6 +30,9 @@ const ( // Name is the name of this kvdb implementation. Name = "etcdv3-kv" defaultRequestTimeout = 10 * time.Second + // defaultSessionTimeout in seconds is used for etcd watch + // to detect connectivity issues + defaultSessionTimeout = 120 urlPrefix = "http://" ) @@ -129,7 +133,7 @@ func (et *etcdKV) Get(key string) (*kvdb.KVPair, error) { switch err { case context.DeadlineExceeded: - logrus.Errorf("kvdb deadline exceeded error: %v, retry count: %v\n", err, i) + logrus.Errorf("[get %v]: kvdb deadline exceeded error: %v, retry count: %v\n", key, err, i) time.Sleep(ec.DefaultIntervalBetweenRetries) case etcdserver.ErrTimeout: logrus.Errorf("kvdb error: %v, retry count: %v \n", err, i) @@ -282,7 +286,7 @@ func (et *etcdKV) Enumerate(prefix string) (kvdb.KVPairs, error) { switch err { case context.DeadlineExceeded: - logrus.Errorf("kvdb deadline exceeded error: %v, retry count: %v\n", err, i) + logrus.Errorf("[enumerate %v]: kvdb deadline exceeded error: %v, retry count: %v\n", prefix, err, i) time.Sleep(ec.DefaultIntervalBetweenRetries) case etcdserver.ErrTimeout: logrus.Errorf("kvdb error: %v, retry count: %v \n", err, i) @@ -390,7 +394,7 @@ func (et *etcdKV) Keys(prefix, sep string) ([]string, error) { cancel() switch err { case context.DeadlineExceeded: - logrus.Errorf("kvdb deadline exceeded error: %v, retry count: %v\n", err, i) + logrus.Errorf("[%v.%v]: kvdb deadline exceeded error: %v, retry count: %v\n", prefix, sep, err, i) time.Sleep(ec.DefaultIntervalBetweenRetries) case etcdserver.ErrTimeout: logrus.Errorf("kvdb error: %v, retry count: %v \n", err, i) @@ -715,7 +719,7 @@ func (et *etcdKV) setWithRetry(key, value string, ttl uint64) (*kvdb.KVPair, err handle_error: switch err { case context.DeadlineExceeded: - logrus.Errorf("kvdb deadline exceeded error: %v, retry count: %v\n", err, i) + logrus.Errorf("[set %v]: kvdb deadline exceeded error: %v, retry count: %v\n", key, err, i) time.Sleep(ec.DefaultIntervalBetweenRetries) case etcdserver.ErrTimeout: logrus.Errorf("kvdb error: %v, retry count: %v \n", err, i) @@ -803,43 +807,63 @@ func (et *etcdKV) watchStart( if waitIndex != 0 { opts = append(opts, e.WithRev(int64(waitIndex+1))) } + session, err := concurrency.NewSession(et.kvClient, concurrency.WithTTL(defaultSessionTimeout)) + if err != nil { + logrus.Errorf("Failed to establish session for etcd client watch: %v", err) + } watcher := e.NewWatcher(et.kvClient) - watchChan := watcher.Watch(context.Background(), key, opts...) - for wresp := range watchChan { - if wresp.Created == true { - continue - } - if wresp.Canceled == true { - // Watch is canceled. Notify the watcher - logrus.Errorf("Watch on key %v cancelled. Error: %v", key, wresp.Err()) - _ = cb(key, opaque, nil, kvdb.ErrWatchStopped) - } else { - for _, ev := range wresp.Events { - var action string - if ev.Type == mvccpb.PUT { - if ev.Kv.Version == 1 { - action = "create" + ctx := context.Background() + + watchRet := make(chan error) + watchChan := watcher.Watch(ctx, key, opts...) + go func() { + for wresp := range watchChan { + if wresp.Created == true { + continue + } + if wresp.Canceled == true { + // Watch is canceled. Notify the watcher + logrus.Errorf("Watch on key %v cancelled. Error: %v", key, wresp.Err()) + _ = cb(key, opaque, nil, kvdb.ErrWatchStopped) + } else { + for _, ev := range wresp.Events { + var action string + if ev.Type == mvccpb.PUT { + if ev.Kv.Version == 1 { + action = "create" + } else { + action = "set" + } + } else if ev.Type == mvccpb.DELETE { + action = "delete" } else { - action = "set" + action = "unknown" } - } else if ev.Type == mvccpb.DELETE { - action = "delete" - } else { - action = "unknown" - } - err := cb(key, opaque, et.resultToKv(ev.Kv, action), nil) - if err != nil { - closeErr := watcher.Close() - // etcd server might close the context before us. - if closeErr != context.Canceled && closeErr != nil { - logrus.Errorf("Unable to close the watcher channel for key %v : %v", key, closeErr) + err := cb(key, opaque, et.resultToKv(ev.Kv, action), nil) + if err != nil { + closeErr := watcher.Close() + // etcd server might close the context before us. + if closeErr != context.Canceled && closeErr != nil { + logrus.Errorf("Unable to close the watcher channel for key %v : %v", key, closeErr) + } + // Indicate the caller that watch has been canceled + _ = cb(key, opaque, nil, kvdb.ErrWatchStopped) + watchRet <- err + break } - // Indicate the caller that watch has been canceled - _ = cb(key, opaque, nil, kvdb.ErrWatchStopped) - break } } } + }() + + select { + case <-session.Done(): // closed by etcd + // Close the watcher + watcher.Close() + // Indicate the caller that watch has been canceled + _ = cb(key, opaque, nil, kvdb.ErrWatchStopped) + case err := <-watchRet: // error in watcher + logrus.Errorf("Watch for %v stopped: %v",key, err) } } @@ -1138,7 +1162,7 @@ func (e *etcdKV) ListMembers() (map[string]*kvdb.MemberUrls, error) { resp := make(map[string]*kvdb.MemberUrls) for _, member := range memberListResponse.Members { resp[member.Name] = &kvdb.MemberUrls{ - PeerUrls: member.PeerURLs, + PeerUrls: member.PeerURLs, ClientUrls: member.ClientURLs, } } From aef05af370f0b7166c23ef51b09832ed1a37efd6 Mon Sep 17 00:00:00 2001 From: Aditya Dani Date: Thu, 15 Jun 2017 10:59:09 +0100 Subject: [PATCH 2/2] Add a cancellable context for Watch api --- etcd/v3/kv_etcd.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/etcd/v3/kv_etcd.go b/etcd/v3/kv_etcd.go index dc2b0450..ed7ac2d5 100644 --- a/etcd/v3/kv_etcd.go +++ b/etcd/v3/kv_etcd.go @@ -812,7 +812,7 @@ func (et *etcdKV) watchStart( logrus.Errorf("Failed to establish session for etcd client watch: %v", err) } watcher := e.NewWatcher(et.kvClient) - ctx := context.Background() + ctx, watchCancel := context.WithCancel(context.Background()) watchRet := make(chan error) watchChan := watcher.Watch(ctx, key, opts...) @@ -858,6 +858,8 @@ func (et *etcdKV) watchStart( select { case <-session.Done(): // closed by etcd + // Close the context + watchCancel() // Close the watcher watcher.Close() // Indicate the caller that watch has been canceled