Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle etcd connection failures in etcd v3 watch API. #9

Merged
merged 2 commits into from
Jun 15, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 61 additions & 35 deletions etcd/v3/kv_etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"golang.org/x/net/context"

e "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/coreos/etcd/mvcc/mvccpb"
Expand All @@ -29,6 +30,9 @@ const (
// Name is the name of this kvdb implementation.
Name = "etcdv3-kv"
defaultRequestTimeout = 10 * time.Second
// defaultSessionTimeout in seconds is used for etcd watch
// to detect connectivity issues
defaultSessionTimeout = 120
urlPrefix = "http://"
)

Expand Down Expand Up @@ -129,7 +133,7 @@ func (et *etcdKV) Get(key string) (*kvdb.KVPair, error) {

switch err {
case context.DeadlineExceeded:
logrus.Errorf("kvdb deadline exceeded error: %v, retry count: %v\n", err, i)
logrus.Errorf("[get %v]: kvdb deadline exceeded error: %v, retry count: %v\n", key, err, i)
time.Sleep(ec.DefaultIntervalBetweenRetries)
case etcdserver.ErrTimeout:
logrus.Errorf("kvdb error: %v, retry count: %v \n", err, i)
Expand Down Expand Up @@ -282,7 +286,7 @@ func (et *etcdKV) Enumerate(prefix string) (kvdb.KVPairs, error) {

switch err {
case context.DeadlineExceeded:
logrus.Errorf("kvdb deadline exceeded error: %v, retry count: %v\n", err, i)
logrus.Errorf("[enumerate %v]: kvdb deadline exceeded error: %v, retry count: %v\n", prefix, err, i)
time.Sleep(ec.DefaultIntervalBetweenRetries)
case etcdserver.ErrTimeout:
logrus.Errorf("kvdb error: %v, retry count: %v \n", err, i)
Expand Down Expand Up @@ -390,7 +394,7 @@ func (et *etcdKV) Keys(prefix, sep string) ([]string, error) {
cancel()
switch err {
case context.DeadlineExceeded:
logrus.Errorf("kvdb deadline exceeded error: %v, retry count: %v\n", err, i)
logrus.Errorf("[%v.%v]: kvdb deadline exceeded error: %v, retry count: %v\n", prefix, sep, err, i)
time.Sleep(ec.DefaultIntervalBetweenRetries)
case etcdserver.ErrTimeout:
logrus.Errorf("kvdb error: %v, retry count: %v \n", err, i)
Expand Down Expand Up @@ -715,7 +719,7 @@ func (et *etcdKV) setWithRetry(key, value string, ttl uint64) (*kvdb.KVPair, err
handle_error:
switch err {
case context.DeadlineExceeded:
logrus.Errorf("kvdb deadline exceeded error: %v, retry count: %v\n", err, i)
logrus.Errorf("[set %v]: kvdb deadline exceeded error: %v, retry count: %v\n", key, err, i)
time.Sleep(ec.DefaultIntervalBetweenRetries)
case etcdserver.ErrTimeout:
logrus.Errorf("kvdb error: %v, retry count: %v \n", err, i)
Expand Down Expand Up @@ -803,43 +807,65 @@ func (et *etcdKV) watchStart(
if waitIndex != 0 {
opts = append(opts, e.WithRev(int64(waitIndex+1)))
}
session, err := concurrency.NewSession(et.kvClient, concurrency.WithTTL(defaultSessionTimeout))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per the following change, it may be better to create a cancel context ?
etcd-io/etcd#6699
Also, when it retries, the connection go to the next available etcd ? (kvClient needs to be refreshed ?)
otherwise looks good !

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I had that cancel context, but cancelling it does not have any effect on the go-routine which is handling the watch responses (watchChan). Will add it back.

Should we refresh the kvClient in the watch api ?

The v2 docs say that the client goes to the next etcd - https://github.com/coreos/etcd/tree/master/client#caveat
But I could not find any doc or mention for clientv3.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably the client is fixed in v3.2.0
I am following up with etcd here - etcd-io/etcd#7941

And if that works then we might not need this change at all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gr8. Let me know if you want me to follow up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a single node etcd server, if it goes down, the watch will still be hung and won't return. Looks like we will still need this change anyways. I am still running a test to see if v3.2.0 solves the reconnection issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to clarify, with this change in a single node etcd, session should terminate right ? (ignoring the reconnection issue ?)

if err != nil {
logrus.Errorf("Failed to establish session for etcd client watch: %v", err)
}
watcher := e.NewWatcher(et.kvClient)
watchChan := watcher.Watch(context.Background(), key, opts...)
for wresp := range watchChan {
if wresp.Created == true {
continue
}
if wresp.Canceled == true {
// Watch is canceled. Notify the watcher
logrus.Errorf("Watch on key %v cancelled. Error: %v", key, wresp.Err())
_ = cb(key, opaque, nil, kvdb.ErrWatchStopped)
} else {
for _, ev := range wresp.Events {
var action string
if ev.Type == mvccpb.PUT {
if ev.Kv.Version == 1 {
action = "create"
ctx, watchCancel := context.WithCancel(context.Background())

watchRet := make(chan error)
watchChan := watcher.Watch(ctx, key, opts...)
go func() {
for wresp := range watchChan {
if wresp.Created == true {
continue
}
if wresp.Canceled == true {
// Watch is canceled. Notify the watcher
logrus.Errorf("Watch on key %v cancelled. Error: %v", key, wresp.Err())
_ = cb(key, opaque, nil, kvdb.ErrWatchStopped)
} else {
for _, ev := range wresp.Events {
var action string
if ev.Type == mvccpb.PUT {
if ev.Kv.Version == 1 {
action = "create"
} else {
action = "set"
}
} else if ev.Type == mvccpb.DELETE {
action = "delete"
} else {
action = "set"
action = "unknown"
}
} else if ev.Type == mvccpb.DELETE {
action = "delete"
} else {
action = "unknown"
}
err := cb(key, opaque, et.resultToKv(ev.Kv, action), nil)
if err != nil {
closeErr := watcher.Close()
// etcd server might close the context before us.
if closeErr != context.Canceled && closeErr != nil {
logrus.Errorf("Unable to close the watcher channel for key %v : %v", key, closeErr)
err := cb(key, opaque, et.resultToKv(ev.Kv, action), nil)
if err != nil {
closeErr := watcher.Close()
// etcd server might close the context before us.
if closeErr != context.Canceled && closeErr != nil {
logrus.Errorf("Unable to close the watcher channel for key %v : %v", key, closeErr)
}
// Indicate the caller that watch has been canceled
_ = cb(key, opaque, nil, kvdb.ErrWatchStopped)
watchRet <- err
break
}
// Indicate the caller that watch has been canceled
_ = cb(key, opaque, nil, kvdb.ErrWatchStopped)
break
}
}
}
}()

select {
case <-session.Done(): // closed by etcd
// Close the context
watchCancel()
// Close the watcher
watcher.Close()
// Indicate the caller that watch has been canceled
_ = cb(key, opaque, nil, kvdb.ErrWatchStopped)
case err := <-watchRet: // error in watcher
logrus.Errorf("Watch for %v stopped: %v",key, err)
}
}

Expand Down Expand Up @@ -1138,7 +1164,7 @@ func (e *etcdKV) ListMembers() (map[string]*kvdb.MemberUrls, error) {
resp := make(map[string]*kvdb.MemberUrls)
for _, member := range memberListResponse.Members {
resp[member.Name] = &kvdb.MemberUrls{
PeerUrls: member.PeerURLs,
PeerUrls: member.PeerURLs,
ClientUrls: member.ClientURLs,
}
}
Expand Down