Skip to content

Commit

Permalink
etcd: fix refresh feature
Browse files Browse the repository at this point in the history
When using refresh, etcd store v2 watch is broken. Although with refresh
store should not trigger current watchers, it should still add events into
the watchhub to make a complete history. Current store fails to add the event
into the watchhub, which causes issues.
  • Loading branch information
xiang90 authored and gyuho committed May 27, 2016
1 parent 20793a2 commit 3b100ad
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 17 deletions.
7 changes: 4 additions & 3 deletions Documentation/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,10 +233,11 @@ curl http://127.0.0.1:2379/v2/keys/foo -XPUT -d value=bar -d ttl= -d prevExist=t

### Refreshing key TTL

Keys in etcd can be refreshed without notifying watchers
this can be achieved by setting the refresh to true when updating a TTL
Keys in etcd can be refreshed without notifying current watchers.

You cannot update the value of a key when refreshing it
This can be achieved by setting the refresh to true when updating a TTL.

You cannot update the value of a key when refreshing it.

```sh
curl http://127.0.0.1:2379/v2/keys/foo -XPUT -d value=bar -d ttl=5
Expand Down
5 changes: 5 additions & 0 deletions store/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Event struct {
Node *NodeExtern `json:"node,omitempty"`
PrevNode *NodeExtern `json:"prevNode,omitempty"`
EtcdIndex uint64 `json:"-"`
Refresh bool `json:"refresh,omitempty"`
}

func newEvent(action string, key string, modifiedIndex, createdIndex uint64) *Event {
Expand Down Expand Up @@ -64,3 +65,7 @@ func (e *Event) Clone() *Event {
PrevNode: e.PrevNode.Clone(),
}
}

func (e *Event) SetRefresh() {
e.Refresh = true
}
30 changes: 16 additions & 14 deletions store/event_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,24 +78,26 @@ func (eh *EventHistory) scan(key string, recursive bool, index uint64) (*Event,
for {
e := eh.Queue.Events[i]

ok := (e.Node.Key == key)
if !e.Refresh {
ok := (e.Node.Key == key)

if recursive {
// add tailing slash
key = path.Clean(key)
if key[len(key)-1] != '/' {
key = key + "/"
}
if recursive {
// add tailing slash
key = path.Clean(key)
if key[len(key)-1] != '/' {
key = key + "/"
}

ok = ok || strings.HasPrefix(e.Node.Key, key)
}
ok = ok || strings.HasPrefix(e.Node.Key, key)
}

if (e.Action == Delete || e.Action == Expire) && e.PrevNode != nil && e.PrevNode.Dir {
ok = ok || strings.HasPrefix(key, e.PrevNode.Key)
}
if (e.Action == Delete || e.Action == Expire) && e.PrevNode != nil && e.PrevNode.Dir {
ok = ok || strings.HasPrefix(key, e.PrevNode.Key)
}

if ok {
return e, nil
if ok {
return e, nil
}
}

i = (i + 1) % eh.Queue.Capacity
Expand Down
9 changes: 9 additions & 0 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,9 @@ func (s *store) Set(nodePath string, dir bool, value string, expireOpts TTLOptio

if !expireOpts.Refresh {
s.WatcherHub.notify(e)
} else {
e.SetRefresh()
s.WatcherHub.add(e)
}

return e, nil
Expand Down Expand Up @@ -314,6 +317,9 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint

if !expireOpts.Refresh {
s.WatcherHub.notify(e)
} else {
e.SetRefresh()
s.WatcherHub.add(e)
}

return e, nil
Expand Down Expand Up @@ -539,6 +545,9 @@ func (s *store) Update(nodePath string, newValue string, expireOpts TTLOptionSet

if !expireOpts.Refresh {
s.WatcherHub.notify(e)
} else {
e.SetRefresh()
s.WatcherHub.add(e)
}

s.CurrentIndex = nextIndex
Expand Down
4 changes: 4 additions & 0 deletions store/watcher_hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ func (wh *watcherHub) watch(key string, recursive, stream bool, index, storeInde
return w, nil
}

func (wh *watcherHub) add(e *Event) {
e = wh.EventHistory.addEvent(e)
}

// notify function accepts an event and notify to the watchers.
func (wh *watcherHub) notify(e *Event) {
e = wh.EventHistory.addEvent(e) // add event into the eventHistory
Expand Down

0 comments on commit 3b100ad

Please sign in to comment.