Skip to content

Commit

Permalink
Etcdv3: Watch API Improvements
Browse files Browse the repository at this point in the history
1. Send an ErrWatchStopped to the caller only once.
- Currently ErrWatchStopped gets sent to the caller multiple
  times causing a resubscribing watch to fail as well.

2. Use context with leader requirement for Watch API.

- By default the etcd watchers will hang in case of a network partition and they are connected
  to the minority.
- As mentioned here - etcd-io/etcd#7247 (comment)
  setting the leader requirement for watchers allows them to switch to the majority partition.
  • Loading branch information
adityadani committed Nov 24, 2018
1 parent cbc0d30 commit 1b0ac1b
Showing 1 changed file with 24 additions and 7 deletions.
31 changes: 24 additions & 7 deletions etcd/v3/kv_etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,11 @@ func (w *watchQ) start() {
if err != nil {
w.done = true
logrus.Infof("Watch cb for key %v returned err: %v", key, err)
// Indicate the caller that watch has been canceled
_ = w.cb(key, w.opaque, nil, kvdb.ErrWatchStopped)
if err != kvdb.ErrWatchStopped {
// The caller returned an error. Indicate the caller
// that the watch has been stopped
_ = w.cb(key, w.opaque, nil, kvdb.ErrWatchStopped)
} // else we stopped the watch and the caller has been notified
// Indicate that watch is returning.
close(w.watchRet)
break
Expand Down Expand Up @@ -905,8 +908,10 @@ func (et *etcdKV) watchStart(
}
sessionChan := make(chan int, 1)
var (
session *concurrency.Session
err error
session *concurrency.Session
err error
watchStopLock sync.Mutex
watchStopped bool
)
go func() {
session, err = concurrency.NewSession(
Expand All @@ -928,7 +933,7 @@ func (et *etcdKV) watchStart(
_ = cb(key, opaque, nil, kvdb.ErrWatchStopped)
return
}
ctx, watchCancel := context.WithCancel(context.Background())
ctx, watchCancel := context.WithCancel(getContextWithLeaderRequirement())
watchRet := make(chan error)
watchChan := et.kvClient.Watch(ctx, key, opts...)
watchQ := newWatchQ(opaque, cb, watchRet)
Expand Down Expand Up @@ -964,14 +969,26 @@ func (et *etcdKV) watchStart(
}
}
logrus.Errorf("Watch on key %v closed without a Cancel response.", key)
watchQ.enqueue(key, nil, kvdb.ErrWatchStopped)
watchStopLock.Lock()
// Stop the watch only if it has not been stopped already
if !watchStopped {
watchQ.enqueue(key, nil, kvdb.ErrWatchStopped)
watchStopped = true
}
watchStopLock.Unlock()
}()

select {
case <-session.Done(): // closed by etcd
// Indicate the caller that watch has been canceled
logrus.Errorf("Watch closing session for key: %v", key)
watchQ.enqueue(key, nil, kvdb.ErrWatchStopped)
watchStopLock.Lock()
// Stop the watch only if it has not been stopped already
if !watchStopped {
watchQ.enqueue(key, nil, kvdb.ErrWatchStopped)
watchStopped = true
}
watchStopLock.Unlock()
watchCancel()
case <-watchRet: // error in watcher
// Close the context
Expand Down

0 comments on commit 1b0ac1b

Please sign in to comment.