From 4f6815207752dcfd4dc2e9a50cf0a3c1b2fa6179 Mon Sep 17 00:00:00 2001 From: Caleb Champlin Date: Mon, 21 Sep 2015 15:30:58 -0600 Subject: [PATCH] Add TTL refreshing via refresh parameter --- Documentation/api.md | 57 +++++++++-- etcdserver/etcdhttp/client.go | 17 ++++ etcdserver/etcdserverpb/etcdserver.pb.go | 31 +++++- etcdserver/etcdserverpb/etcdserver.proto | 1 + etcdserver/server.go | 13 +-- store/node.go | 2 +- store/stats_test.go | 2 +- store/store.go | 66 ++++++++---- store/store_test.go | 123 +++++++++++++++++++++-- 9 files changed, 267 insertions(+), 45 deletions(-) diff --git a/Documentation/api.md b/Documentation/api.md index 9b1faeecc14..bf982d5334c 100644 --- a/Documentation/api.md +++ b/Documentation/api.md @@ -234,6 +234,51 @@ curl http://127.0.0.1:2379/v2/keys/foo -XPUT -d value=bar -d ttl= -d prevExist=t } ``` +### Refreshing key TTL + +Keys in etcd can be refreshed without updating the value or notifying watchers +this can be achieved by setting the refresh to true when updating a TTL + +```sh +curl http://127.0.0.1:2379/v2/keys/foo -XPUT -d value=bar -d ttl=5 +curl http://127.0.0.1:2379/v2/keys/foo -XPUT -d value=bar -d ttl=5 -d refresh=true -d prevExist=true +``` + +```json +{ + "action": "set", + "node": { + "createdIndex": 5, + "expiration": "2013-12-04T12:01:21.874888581-08:00", + "key": "/foo", + "modifiedIndex": 5, + "ttl": 5, + "value": "bar" + } +} +{ + "action":"update", + "node":{ + "key":"/foo", + "value":"bar", + "expiration": "2013-12-04T12:01:26.874888581-08:00", + "ttl":5, + "modifiedIndex":6, + "createdIndex":5 + }, + "prevNode":{ + "key":"/foo", + "value":"bar", + "expiration":"2013-12-04T12:01:21.874888581-08:00", + "ttl":3, + "modifiedIndex":5, + "createdIndex":5 + } +} +``` + + + ### Waiting for a change @@ -276,7 +321,7 @@ The first terminal should get the notification and return with the same response However, the watch command can do more than this. Using the index, we can watch for commands that have happened in the past. -This is useful for ensuring you don't miss events between watch commands. +This is useful for ensuring you don't miss events between watch commands. Typically, we watch again from the `modifiedIndex` + 1 of the node we got. Let's try to watch for the set command of index 7 again: @@ -296,13 +341,13 @@ curl 'http://127.0.0.1:2379/v2/keys/foo?wait=true&waitIndex=8' Then even if etcd is on index 9 or 800, the first event to occur to the `/foo` key between 8 and the current index will be returned. -**Note**: etcd only keeps the responses of the most recent 1000 events across all etcd keys. +**Note**: etcd only keeps the responses of the most recent 1000 events across all etcd keys. It is recommended to send the response to another thread to process immediately -instead of blocking the watch while processing the result. +instead of blocking the watch while processing the result. #### Watch from cleared event index -If we miss all the 1000 events, we need to recover the current state of the +If we miss all the 1000 events, we need to recover the current state of the watching key space through a get and then start to watch from the `X-Etcd-Index` + 1. @@ -324,7 +369,7 @@ To start watch, first we need to fetch the current state of key `/foo`: curl 'http://127.0.0.1:2379/v2/keys/foo' -vv ``` -``` +``` < HTTP/1.1 200 OK < Content-Type: application/json < X-Etcd-Cluster-Id: 7e27652122e8b2ae @@ -333,7 +378,7 @@ curl 'http://127.0.0.1:2379/v2/keys/foo' -vv < X-Raft-Term: 2 < Date: Mon, 05 Jan 2015 18:54:43 GMT < Transfer-Encoding: chunked -< +< {"action":"get","node":{"key":"/foo","value":"bar","modifiedIndex":7,"createdIndex":7}} ``` diff --git a/etcdserver/etcdhttp/client.go b/etcdserver/etcdhttp/client.go index 64963798fd1..f77304a65c6 100644 --- a/etcdserver/etcdhttp/client.go +++ b/etcdserver/etcdhttp/client.go @@ -529,6 +529,19 @@ func parseKeyRequest(r *http.Request, clock clockwork.Clock) (etcdserverpb.Reque pe = &bv } + // refresh is nullable, so leave it null if not specified + var refresh *bool + if _, ok := r.Form["refresh"]; ok { + bv, err := getBool(r.Form, "refresh") + if err != nil { + return emptyReq, etcdErr.NewRequestError( + etcdErr.EcodeInvalidField, + "invalid value for refresh", + ) + } + refresh = &bv + } + rr := etcdserverpb.Request{ Method: r.Method, Path: p, @@ -549,6 +562,10 @@ func parseKeyRequest(r *http.Request, clock clockwork.Clock) (etcdserverpb.Reque rr.PrevExist = pe } + if refresh != nil { + rr.Refresh = *refresh + } + // Null TTL is equivalent to unset Expiration if ttl != nil { expr := time.Duration(*ttl) * time.Second diff --git a/etcdserver/etcdserverpb/etcdserver.pb.go b/etcdserver/etcdserverpb/etcdserver.pb.go index 519e4865cee..d46a5fc0805 100644 --- a/etcdserver/etcdserverpb/etcdserver.pb.go +++ b/etcdserver/etcdserverpb/etcdserver.pb.go @@ -7,8 +7,6 @@ It is generated from these files: etcdserver.proto - raft_internal.proto - rpc.proto It has these top-level messages: Request @@ -45,6 +43,7 @@ type Request struct { Quorum bool `protobuf:"varint,14,opt" json:"Quorum"` Time int64 `protobuf:"varint,15,opt" json:"Time"` Stream bool `protobuf:"varint,16,opt" json:"Stream"` + Refresh bool `protobuf:"varint,17,opt" json:"Refresh"` XXX_unrecognized []byte `json:"-"` } @@ -168,6 +167,16 @@ func (m *Request) MarshalTo(data []byte) (int, error) { data[i] = 0 } i++ + data[i] = 0x88 + i++ + data[i] = 0x1 + i++ + if m.Refresh { + data[i] = 1 + } else { + data[i] = 0 + } + i++ if m.XXX_unrecognized != nil { i += copy(data[i:], m.XXX_unrecognized) } @@ -253,6 +262,7 @@ func (m *Request) Size() (n int) { n += 2 n += 1 + sovEtcdserver(uint64(m.Time)) n += 3 + n += 3 if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -606,6 +616,23 @@ func (m *Request) Unmarshal(data []byte) error { } } m.Stream = bool(v != 0) + case 17: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Refresh", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + v |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.Refresh = bool(v != 0) default: var sizeOfWire int for { diff --git a/etcdserver/etcdserverpb/etcdserver.proto b/etcdserver/etcdserverpb/etcdserver.proto index 024c3037f10..e3c7606c9df 100644 --- a/etcdserver/etcdserverpb/etcdserver.proto +++ b/etcdserver/etcdserverpb/etcdserver.proto @@ -25,6 +25,7 @@ message Request { optional bool Quorum = 14 [(gogoproto.nullable) = false]; optional int64 Time = 15 [(gogoproto.nullable) = false]; optional bool Stream = 16 [(gogoproto.nullable) = false]; + optional bool Refresh = 17 [(gogoproto.nullable) = false]; } message Metadata { diff --git a/etcdserver/server.go b/etcdserver/server.go index 61d4c8aaec6..e4857eb4159 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -794,23 +794,24 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response { return Response{Event: ev, err: err} } expr := timeutil.UnixNanoToTime(r.Expiration) + ttlOptions := store.TTLOptionSet{ExpireTime: expr, Refresh: r.Refresh} switch r.Method { case "POST": - return f(s.store.Create(r.Path, r.Dir, r.Val, true, expr)) + return f(s.store.Create(r.Path, r.Dir, r.Val, true, ttlOptions)) case "PUT": exists, existsSet := pbutil.GetBool(r.PrevExist) switch { case existsSet: if exists { if r.PrevIndex == 0 && r.PrevValue == "" { - return f(s.store.Update(r.Path, r.Val, expr)) + return f(s.store.Update(r.Path, r.Val, ttlOptions)) } else { - return f(s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, expr)) + return f(s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions)) } } - return f(s.store.Create(r.Path, r.Dir, r.Val, false, expr)) + return f(s.store.Create(r.Path, r.Dir, r.Val, false, ttlOptions)) case r.PrevIndex > 0 || r.PrevValue != "": - return f(s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, expr)) + return f(s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions)) default: // TODO (yicheng): cluster should be the owner of cluster prefix store // we should not modify cluster store here. @@ -825,7 +826,7 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response { if r.Path == path.Join(StoreClusterPrefix, "version") { s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val))) } - return f(s.store.Set(r.Path, r.Dir, r.Val, expr)) + return f(s.store.Set(r.Path, r.Dir, r.Val, ttlOptions)) } case "DELETE": switch { diff --git a/store/node.go b/store/node.go index e41eec2aac7..dcadaed715d 100644 --- a/store/node.go +++ b/store/node.go @@ -31,7 +31,7 @@ const ( CompareNotMatch = 3 ) -var Permanent time.Time +var Permanent = TTLOptionSet{} // node is the basic element in the store system. // A key-value pair will have a string value diff --git a/store/stats_test.go b/store/stats_test.go index da1bac077d8..17aa4751c23 100644 --- a/store/stats_test.go +++ b/store/stats_test.go @@ -104,7 +104,7 @@ func TestStoreStatsExpireCount(t *testing.T) { fc := newFakeClock() s.clock = fc - s.Create("/foo", false, "bar", false, fc.Now().Add(500*time.Millisecond)) + s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond)}) assert.Equal(t, uint64(0), s.Stats.ExpireCount, "") fc.Advance(600 * time.Millisecond) s.DeleteExpiredKeys(fc.Now()) diff --git a/store/store.go b/store/store.go index c925c62b336..cc29f4f3f65 100644 --- a/store/store.go +++ b/store/store.go @@ -23,6 +23,7 @@ import ( "sync" "time" + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork" etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/pkg/types" @@ -42,12 +43,12 @@ type Store interface { Index() uint64 Get(nodePath string, recursive, sorted bool) (*Event, error) - Set(nodePath string, dir bool, value string, expireTime time.Time) (*Event, error) - Update(nodePath string, newValue string, expireTime time.Time) (*Event, error) + Set(nodePath string, dir bool, value string, expireOpts TTLOptionSet) (*Event, error) + Update(nodePath string, newValue string, expireOpts TTLOptionSet) (*Event, error) Create(nodePath string, dir bool, value string, unique bool, - expireTime time.Time) (*Event, error) + expireOpts TTLOptionSet) (*Event, error) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64, - value string, expireTime time.Time) (*Event, error) + value string, expireOpts TTLOptionSet) (*Event, error) Delete(nodePath string, dir, recursive bool) (*Event, error) CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error) @@ -63,6 +64,11 @@ type Store interface { DeleteExpiredKeys(cutoff time.Time) } +type TTLOptionSet struct { + ExpireTime time.Time + Refresh bool +} + type store struct { Root *node WatcherHub *watcherHub @@ -85,9 +91,9 @@ func New(namespaces ...string) Store { func newStore(namespaces ...string) *store { s := new(store) s.CurrentVersion = defaultVersion - s.Root = newDir(s, "/", s.CurrentIndex, nil, Permanent) + s.Root = newDir(s, "/", s.CurrentIndex, nil, Permanent.ExpireTime) for _, namespace := range namespaces { - s.Root.Add(newDir(s, namespace, s.CurrentIndex, s.Root, Permanent)) + s.Root.Add(newDir(s, namespace, s.CurrentIndex, s.Root, Permanent.ExpireTime)) } s.Stats = newStats() s.WatcherHub = newWatchHub(1000) @@ -146,10 +152,10 @@ func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) { // Create creates the node at nodePath. Create will help to create intermediate directories with no ttl. // If the node has already existed, create will fail. // If any node on the path is a file, create will fail. -func (s *store) Create(nodePath string, dir bool, value string, unique bool, expireTime time.Time) (*Event, error) { +func (s *store) Create(nodePath string, dir bool, value string, unique bool, expireOpts TTLOptionSet) (*Event, error) { s.worldLock.Lock() defer s.worldLock.Unlock() - e, err := s.internalCreate(nodePath, dir, value, unique, false, expireTime, Create) + e, err := s.internalCreate(nodePath, dir, value, unique, false, expireOpts.ExpireTime, Create) if err == nil { e.EtcdIndex = s.CurrentIndex @@ -165,7 +171,7 @@ func (s *store) Create(nodePath string, dir bool, value string, unique bool, exp } // Set creates or replace the node at nodePath. -func (s *store) Set(nodePath string, dir bool, value string, expireTime time.Time) (*Event, error) { +func (s *store) Set(nodePath string, dir bool, value string, expireOpts TTLOptionSet) (*Event, error) { var err error s.worldLock.Lock() @@ -188,8 +194,12 @@ func (s *store) Set(nodePath string, dir bool, value string, expireTime time.Tim return nil, err } + if expireOpts.Refresh && len(value) == 0 { + value = n.Value + } + // Set new value - e, err := s.internalCreate(nodePath, dir, value, false, true, expireTime, Set) + e, err := s.internalCreate(nodePath, dir, value, false, true, expireOpts.ExpireTime, Set) if err != nil { return nil, err } @@ -202,7 +212,9 @@ func (s *store) Set(nodePath string, dir bool, value string, expireTime time.Tim e.PrevNode = prev.Node } - s.WatcherHub.notify(e) + if expireOpts.ExpireTime.IsZero() || !expireOpts.Refresh || ((getErr != nil && getErr.ErrorCode == etcdErr.EcodeKeyNotFound) || n.Value != value) { + s.WatcherHub.notify(e) + } return e, nil } @@ -220,7 +232,7 @@ func getCompareFailCause(n *node, which int, prevValue string, prevIndex uint64) } func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64, - value string, expireTime time.Time) (*Event, error) { + value string, expireOpts TTLOptionSet) (*Event, error) { s.worldLock.Lock() defer s.worldLock.Unlock() @@ -264,14 +276,15 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint // if test succeed, write the value n.Write(value, s.CurrentIndex) - n.UpdateTTL(expireTime) + n.UpdateTTL(expireOpts.ExpireTime) // copy the value for safety valueCopy := value eNode.Value = &valueCopy eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock) - - s.WatcherHub.notify(e) + if expireOpts.ExpireTime.IsZero() || !expireOpts.Refresh || prevValue != value { + s.WatcherHub.notify(e) + } s.Stats.Inc(CompareAndSwapSuccess) reportWriteSuccess(CompareAndSwap) @@ -429,10 +442,12 @@ func (s *store) walk(nodePath string, walkFunc func(prev *node, component string return curr, nil } +var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "etcdserver") + // Update updates the value/ttl of the node. // If the node is a file, the value and the ttl can be updated. // If the node is a directory, only the ttl can be updated. -func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (*Event, error) { +func (s *store) Update(nodePath string, newValue string, expireOpts TTLOptionSet) (*Event, error) { s.worldLock.Lock() defer s.worldLock.Unlock() @@ -452,6 +467,11 @@ func (s *store) Update(nodePath string, newValue string, expireTime time.Time) ( return nil, err } + oldValue := n.Value + if expireOpts.Refresh && len(newValue) == 0 { + newValue = oldValue + } + e := newEvent(Update, nodePath, nextIndex, n.CreatedIndex) e.EtcdIndex = nextIndex e.PrevNode = n.Repr(false, false, s.clock) @@ -475,11 +495,17 @@ func (s *store) Update(nodePath string, newValue string, expireTime time.Time) ( } // update ttl - n.UpdateTTL(expireTime) + n.UpdateTTL(expireOpts.ExpireTime) eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock) - s.WatcherHub.notify(e) + plog.Infof("refresh %v", expireOpts.Refresh) + plog.Infof("values %v : %v", oldValue, newValue) + if expireOpts.ExpireTime.IsZero() || !expireOpts.Refresh || oldValue != newValue { + plog.Infof("passed refresh %v", expireOpts.Refresh) + plog.Infof("passed values %v : %v", oldValue, newValue) + s.WatcherHub.notify(e) + } s.Stats.Inc(UpdateSuccess) reportWriteSuccess(Update) @@ -508,7 +534,7 @@ func (s *store) internalCreate(nodePath string, dir bool, value string, unique, // Assume expire times that are way in the past are // This can occur when the time is serialized to JS if expireTime.Before(minExpireTime) { - expireTime = Permanent + expireTime = Permanent.ExpireTime } dirName, nodeName := path.Split(nodePath) @@ -644,7 +670,7 @@ func (s *store) checkDir(parent *node, dirName string) (*node, *etcdErr.Error) { return nil, etcdErr.NewError(etcdErr.EcodeNotDir, node.Path, s.CurrentIndex) } - n := newDir(s, path.Join(parent.Path, dirName), s.CurrentIndex+1, parent, Permanent) + n := newDir(s, path.Join(parent.Path, dirName), s.CurrentIndex+1, parent, Permanent.ExpireTime) parent.Children[dirName] = n diff --git a/store/store_test.go b/store/store_test.go index 42f1807fb07..1c814fa99db 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -52,7 +52,7 @@ func TestMinExpireTime(t *testing.T) { s.clock = fc // FakeClock starts at 0, so minExpireTime should be far in the future.. but just in case assert.True(t, minExpireTime.After(fc.Now()), "minExpireTime should be ahead of FakeClock!") - s.Create("/foo", false, "Y", false, fc.Now().Add(3*time.Second)) + s.Create("/foo", false, "Y", false, TTLOptionSet{ExpireTime: fc.Now().Add(3 * time.Second)}) fc.Advance(5 * time.Second) // Ensure it hasn't expired s.DeleteExpiredKeys(fc.Now()) @@ -77,7 +77,7 @@ func TestStoreGetDirectory(t *testing.T) { s.Create("/foo/baz", true, "", false, Permanent) s.Create("/foo/baz/bat", false, "Y", false, Permanent) s.Create("/foo/baz/_hidden", false, "*", false, Permanent) - s.Create("/foo/baz/ttl", false, "Y", false, fc.Now().Add(time.Second*3)) + s.Create("/foo/baz/ttl", false, "Y", false, TTLOptionSet{ExpireTime: fc.Now().Add(time.Second * 3)}) var eidx uint64 = 7 e, err := s.Get("/foo", true, false) assert.Nil(t, err, "") @@ -346,7 +346,7 @@ func TestStoreUpdateValueTTL(t *testing.T) { var eidx uint64 = 2 s.Create("/foo", false, "bar", false, Permanent) - _, err := s.Update("/foo", "baz", fc.Now().Add(500*time.Millisecond)) + _, err := s.Update("/foo", "baz", TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond)}) e, _ := s.Get("/foo", false, false) assert.Equal(t, *e.Node.Value, "baz", "") assert.Equal(t, e.EtcdIndex, eidx, "") @@ -366,7 +366,7 @@ func TestStoreUpdateDirTTL(t *testing.T) { var eidx uint64 = 3 s.Create("/foo", true, "", false, Permanent) s.Create("/foo/bar", false, "baz", false, Permanent) - e, err := s.Update("/foo", "", fc.Now().Add(500*time.Millisecond)) + e, err := s.Update("/foo", "", TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond)}) assert.Equal(t, e.Node.Dir, true, "") assert.Equal(t, e.EtcdIndex, eidx, "") e, _ = s.Get("/foo/bar", false, false) @@ -736,8 +736,8 @@ func TestStoreWatchExpire(t *testing.T) { s.clock = fc var eidx uint64 = 2 - s.Create("/foo", false, "bar", false, fc.Now().Add(500*time.Millisecond)) - s.Create("/foofoo", false, "barbarbar", false, fc.Now().Add(500*time.Millisecond)) + s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond)}) + s.Create("/foofoo", false, "barbarbar", false, TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond)}) w, _ := s.Watch("/", true, false, 0) assert.Equal(t, w.StartIndex(), eidx, "") @@ -760,6 +760,111 @@ func TestStoreWatchExpire(t *testing.T) { assert.Equal(t, e.Node.Key, "/foofoo", "") } +// Ensure that the store can watch for key expiration when refreshing. +func TestStoreWatchExpireRefresh(t *testing.T) { + s := newStore() + fc := newFakeClock() + s.clock = fc + + var eidx uint64 = 2 + s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond), Refresh: true}) + s.Create("/foofoo", false, "barbarbar", false, TTLOptionSet{ExpireTime: fc.Now().Add(1200 * time.Millisecond), Refresh: true}) + + // Make sure we set watch updates when Refresh is true for newly created keys + w, _ := s.Watch("/", true, false, 0) + assert.Equal(t, w.StartIndex(), eidx, "") + c := w.EventChan() + e := nbselect(c) + assert.Nil(t, e, "") + fc.Advance(600 * time.Millisecond) + s.DeleteExpiredKeys(fc.Now()) + eidx = 3 + e = nbselect(c) + assert.Equal(t, e.EtcdIndex, eidx, "") + assert.Equal(t, e.Action, "expire", "") + assert.Equal(t, e.Node.Key, "/foo", "") + + s.Update("/foofoo", "barbarbar", TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond), Refresh: true}) + w, _ = s.Watch("/", true, false, 4) + fc.Advance(700 * time.Millisecond) + s.DeleteExpiredKeys(fc.Now()) + eidx = 5 // We should skip 4 because a TTL update should occur with no watch notification + assert.Equal(t, w.StartIndex(), eidx-1, "") + e = nbselect(w.EventChan()) + assert.Equal(t, e.EtcdIndex, eidx, "") + assert.Equal(t, e.Action, "expire", "") + assert.Equal(t, e.Node.Key, "/foofoo", "") +} + +// Ensure that the store can watch for key expiration when refreshing with an empty value. +func TestStoreWatchExpireEmptyRefresh(t *testing.T) { + s := newStore() + fc := newFakeClock() + s.clock = fc + + var eidx uint64 = 1 + s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond), Refresh: true}) + // Should be no-op + fc.Advance(200 * time.Millisecond) + s.DeleteExpiredKeys(fc.Now()) + + s.Update("/foo", "", TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond), Refresh: true}) + w, _ := s.Watch("/", true, false, 2) + fc.Advance(700 * time.Millisecond) + s.DeleteExpiredKeys(fc.Now()) + eidx = 3 // We should skip 2 because a TTL update should occur with no watch notification + assert.Equal(t, w.StartIndex(), eidx-1, "") + e := nbselect(w.EventChan()) + assert.Equal(t, e.EtcdIndex, eidx, "") + assert.Equal(t, e.Action, "expire", "") + assert.Equal(t, e.Node.Key, "/foo", "") + assert.Equal(t, *e.PrevNode.Value, "bar", "") +} + +// Ensure that the store can watch for key expiration when refreshing. +func TestStoreWatchExpireCompareRefresh(t *testing.T) { + s := newStore() + fc := newFakeClock() + s.clock = fc + + var eidx uint64 = 2 + s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond), Refresh: true}) + s.Create("/foofoo", false, "barbarbar", false, TTLOptionSet{ExpireTime: fc.Now().Add(1200 * time.Millisecond), Refresh: true}) + + // Make sure we set watch updates when Refresh is true for newly created keys + w, _ := s.Watch("/", true, false, 0) + assert.Equal(t, w.StartIndex(), eidx, "") + c := w.EventChan() + s.CompareAndSwap("/foo", "bar", 0, "barbar", TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond), Refresh: true}) + eidx = 3 + e := nbselect(c) + assert.Equal(t, e.EtcdIndex, eidx, "") + assert.Equal(t, e.Action, "compareAndSwap", "") + assert.Equal(t, e.Node.Key, "/foo", "") + + w, _ = s.Watch("/", true, false, 4) + assert.Equal(t, w.StartIndex(), eidx, "") + c = w.EventChan() + fc.Advance(600 * time.Millisecond) + s.DeleteExpiredKeys(fc.Now()) + eidx = 4 + e = nbselect(c) + assert.Equal(t, e.EtcdIndex, eidx, "") + assert.Equal(t, e.Action, "expire", "") + assert.Equal(t, e.Node.Key, "/foo", "") + + s.CompareAndSwap("/foofoo", "barbarbar", 0, "barbarbar", TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond), Refresh: true}) + w, _ = s.Watch("/", true, false, 5) + fc.Advance(700 * time.Millisecond) + s.DeleteExpiredKeys(fc.Now()) + eidx = 6 // We should skip 5 because a TTL update should occur with no watch notification + assert.Equal(t, w.StartIndex(), eidx-1, "") + e = nbselect(w.EventChan()) + assert.Equal(t, e.EtcdIndex, eidx, "") + assert.Equal(t, e.Action, "expire", "") + assert.Equal(t, e.Node.Key, "/foofoo", "") +} + // Ensure that the store can watch in streaming mode. func TestStoreWatchStream(t *testing.T) { s := newStore() @@ -822,7 +927,7 @@ func TestStoreRecoverWithExpiration(t *testing.T) { var eidx uint64 = 4 s.Create("/foo", true, "", false, Permanent) s.Create("/foo/x", false, "bar", false, Permanent) - s.Create("/foo/y", false, "baz", false, fc.Now().Add(5*time.Millisecond)) + s.Create("/foo/y", false, "baz", false, TTLOptionSet{ExpireTime: fc.Now().Add(5 * time.Millisecond)}) b, err := s.Save() time.Sleep(10 * time.Millisecond) @@ -929,8 +1034,8 @@ func TestStoreWatchExpireWithHiddenKey(t *testing.T) { fc := newFakeClock() s.clock = fc - s.Create("/_foo", false, "bar", false, fc.Now().Add(500*time.Millisecond)) - s.Create("/foofoo", false, "barbarbar", false, fc.Now().Add(1000*time.Millisecond)) + s.Create("/_foo", false, "bar", false, TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond)}) + s.Create("/foofoo", false, "barbarbar", false, TTLOptionSet{ExpireTime: fc.Now().Add(1000 * time.Millisecond)}) w, _ := s.Watch("/", true, false, 0) c := w.EventChan()