Skip to content

Commit

Permalink
Merge pull request #9 from portworx/etcd_v3_watch
Browse files Browse the repository at this point in the history
Handle etcd connection failures in etcd v3 watch API.
  • Loading branch information
adityadani committed Jun 15, 2017
2 parents 8cf54cd + aef05af commit e39da5b
Showing 1 changed file with 61 additions and 35 deletions.
96 changes: 61 additions & 35 deletions etcd/v3/kv_etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"golang.org/x/net/context"

e "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/coreos/etcd/mvcc/mvccpb"
Expand All @@ -29,6 +30,9 @@ const (
// Name is the name of this kvdb implementation.
Name = "etcdv3-kv"
defaultRequestTimeout = 10 * time.Second
// defaultSessionTimeout in seconds is used for etcd watch
// to detect connectivity issues
defaultSessionTimeout = 120
urlPrefix = "http://"
)

Expand Down Expand Up @@ -129,7 +133,7 @@ func (et *etcdKV) Get(key string) (*kvdb.KVPair, error) {

switch err {
case context.DeadlineExceeded:
logrus.Errorf("kvdb deadline exceeded error: %v, retry count: %v\n", err, i)
logrus.Errorf("[get %v]: kvdb deadline exceeded error: %v, retry count: %v\n", key, err, i)
time.Sleep(ec.DefaultIntervalBetweenRetries)
case etcdserver.ErrTimeout:
logrus.Errorf("kvdb error: %v, retry count: %v \n", err, i)
Expand Down Expand Up @@ -282,7 +286,7 @@ func (et *etcdKV) Enumerate(prefix string) (kvdb.KVPairs, error) {

switch err {
case context.DeadlineExceeded:
logrus.Errorf("kvdb deadline exceeded error: %v, retry count: %v\n", err, i)
logrus.Errorf("[enumerate %v]: kvdb deadline exceeded error: %v, retry count: %v\n", prefix, err, i)
time.Sleep(ec.DefaultIntervalBetweenRetries)
case etcdserver.ErrTimeout:
logrus.Errorf("kvdb error: %v, retry count: %v \n", err, i)
Expand Down Expand Up @@ -390,7 +394,7 @@ func (et *etcdKV) Keys(prefix, sep string) ([]string, error) {
cancel()
switch err {
case context.DeadlineExceeded:
logrus.Errorf("kvdb deadline exceeded error: %v, retry count: %v\n", err, i)
logrus.Errorf("[%v.%v]: kvdb deadline exceeded error: %v, retry count: %v\n", prefix, sep, err, i)
time.Sleep(ec.DefaultIntervalBetweenRetries)
case etcdserver.ErrTimeout:
logrus.Errorf("kvdb error: %v, retry count: %v \n", err, i)
Expand Down Expand Up @@ -715,7 +719,7 @@ func (et *etcdKV) setWithRetry(key, value string, ttl uint64) (*kvdb.KVPair, err
handle_error:
switch err {
case context.DeadlineExceeded:
logrus.Errorf("kvdb deadline exceeded error: %v, retry count: %v\n", err, i)
logrus.Errorf("[set %v]: kvdb deadline exceeded error: %v, retry count: %v\n", key, err, i)
time.Sleep(ec.DefaultIntervalBetweenRetries)
case etcdserver.ErrTimeout:
logrus.Errorf("kvdb error: %v, retry count: %v \n", err, i)
Expand Down Expand Up @@ -803,43 +807,65 @@ func (et *etcdKV) watchStart(
if waitIndex != 0 {
opts = append(opts, e.WithRev(int64(waitIndex+1)))
}
session, err := concurrency.NewSession(et.kvClient, concurrency.WithTTL(defaultSessionTimeout))
if err != nil {
logrus.Errorf("Failed to establish session for etcd client watch: %v", err)
}
watcher := e.NewWatcher(et.kvClient)
watchChan := watcher.Watch(context.Background(), key, opts...)
for wresp := range watchChan {
if wresp.Created == true {
continue
}
if wresp.Canceled == true {
// Watch is canceled. Notify the watcher
logrus.Errorf("Watch on key %v cancelled. Error: %v", key, wresp.Err())
_ = cb(key, opaque, nil, kvdb.ErrWatchStopped)
} else {
for _, ev := range wresp.Events {
var action string
if ev.Type == mvccpb.PUT {
if ev.Kv.Version == 1 {
action = "create"
ctx, watchCancel := context.WithCancel(context.Background())

watchRet := make(chan error)
watchChan := watcher.Watch(ctx, key, opts...)
go func() {
for wresp := range watchChan {
if wresp.Created == true {
continue
}
if wresp.Canceled == true {
// Watch is canceled. Notify the watcher
logrus.Errorf("Watch on key %v cancelled. Error: %v", key, wresp.Err())
_ = cb(key, opaque, nil, kvdb.ErrWatchStopped)
} else {
for _, ev := range wresp.Events {
var action string
if ev.Type == mvccpb.PUT {
if ev.Kv.Version == 1 {
action = "create"
} else {
action = "set"
}
} else if ev.Type == mvccpb.DELETE {
action = "delete"
} else {
action = "set"
action = "unknown"
}
} else if ev.Type == mvccpb.DELETE {
action = "delete"
} else {
action = "unknown"
}
err := cb(key, opaque, et.resultToKv(ev.Kv, action), nil)
if err != nil {
closeErr := watcher.Close()
// etcd server might close the context before us.
if closeErr != context.Canceled && closeErr != nil {
logrus.Errorf("Unable to close the watcher channel for key %v : %v", key, closeErr)
err := cb(key, opaque, et.resultToKv(ev.Kv, action), nil)
if err != nil {
closeErr := watcher.Close()
// etcd server might close the context before us.
if closeErr != context.Canceled && closeErr != nil {
logrus.Errorf("Unable to close the watcher channel for key %v : %v", key, closeErr)
}
// Indicate the caller that watch has been canceled
_ = cb(key, opaque, nil, kvdb.ErrWatchStopped)
watchRet <- err
break
}
// Indicate the caller that watch has been canceled
_ = cb(key, opaque, nil, kvdb.ErrWatchStopped)
break
}
}
}
}()

select {
case <-session.Done(): // closed by etcd
// Close the context
watchCancel()
// Close the watcher
watcher.Close()
// Indicate the caller that watch has been canceled
_ = cb(key, opaque, nil, kvdb.ErrWatchStopped)
case err := <-watchRet: // error in watcher
logrus.Errorf("Watch for %v stopped: %v",key, err)
}
}

Expand Down Expand Up @@ -1138,7 +1164,7 @@ func (e *etcdKV) ListMembers() (map[string]*kvdb.MemberUrls, error) {
resp := make(map[string]*kvdb.MemberUrls)
for _, member := range memberListResponse.Members {
resp[member.Name] = &kvdb.MemberUrls{
PeerUrls: member.PeerURLs,
PeerUrls: member.PeerURLs,
ClientUrls: member.ClientURLs,
}
}
Expand Down

0 comments on commit e39da5b

Please sign in to comment.