From ef988020b70d08fa6498905e718bbdaae1b6af7a Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 21 Dec 2013 15:56:16 +0800 Subject: [PATCH 1/4] fix(event_history) fix a bug in event queue --- server/v2/get_handler.go | 2 +- store/event_history.go | 5 +++-- store/event_queue.go | 13 ++++--------- 3 files changed, 8 insertions(+), 12 deletions(-) diff --git a/server/v2/get_handler.go b/server/v2/get_handler.go index 9a67ea2ae9b..abdb94f7eb3 100644 --- a/server/v2/get_handler.go +++ b/server/v2/get_handler.go @@ -57,7 +57,7 @@ func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error { // Start the watcher on the store. eventChan, err := s.Store().Watch(key, recursive, sinceIndex) if err != nil { - return etcdErr.NewError(500, key, s.Store().Index()) + return err } cn, _ := w.(http.CloseNotifier) diff --git a/store/event_history.go b/store/event_history.go index 19d781def3f..cb3ab49c222 100644 --- a/store/event_history.go +++ b/store/event_history.go @@ -46,7 +46,7 @@ func (eh *EventHistory) scan(key string, recursive bool, index uint64) (*Event, defer eh.rwl.RUnlock() // the index should locate after the event history's StartIndex - if index-eh.StartIndex < 0 { + if index < eh.StartIndex { return nil, etcdErr.NewError(etcdErr.EcodeEventIndexCleared, fmt.Sprintf("the requested history has been cleared [%v/%v]", @@ -81,7 +81,7 @@ func (eh *EventHistory) scan(key string, recursive bool, index uint64) (*Event, i = (i + 1) % eh.Queue.Capacity - if i > eh.Queue.back() { + if i == eh.Queue.Back { return nil, nil } } @@ -95,6 +95,7 @@ func (eh *EventHistory) clone() *EventHistory { Events: make([]*Event, eh.Queue.Capacity), Size: eh.Queue.Size, Front: eh.Queue.Front, + Back: eh.Queue.Back, } for i, e := range eh.Queue.Events { diff --git a/store/event_queue.go b/store/event_queue.go index 0852956b1b8..e32bf4cc3a4 100644 --- a/store/event_queue.go +++ b/store/event_queue.go @@ -4,22 +4,17 @@ type eventQueue struct { Events []*Event Size int Front int + Back int Capacity int } -func (eq *eventQueue) back() int { - return (eq.Front + eq.Size - 1 + eq.Capacity) % eq.Capacity -} - func (eq *eventQueue) insert(e *Event) { - index := (eq.back() + 1) % eq.Capacity - - eq.Events[index] = e + eq.Events[eq.Back] = e + eq.Back = (eq.Back + 1) % eq.Capacity if eq.Size == eq.Capacity { //dequeue - eq.Front = (index + 1) % eq.Capacity + eq.Front = (eq.Front + 1) % eq.Capacity } else { eq.Size++ } - } From 0937b4d2668f3d2a70d27a5cda8ee1158f5e172b Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 21 Dec 2013 19:29:05 +0800 Subject: [PATCH 2/4] refactor(event_history.go) remove the extra logic --- store/event_history.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/store/event_history.go b/store/event_history.go index cb3ab49c222..02afb941cf0 100644 --- a/store/event_history.go +++ b/store/event_history.go @@ -58,7 +58,8 @@ func (eh *EventHistory) scan(key string, recursive bool, index uint64) (*Event, return nil, nil } - i := eh.Queue.Front + offset := index - eh.StartIndex + i := (eh.Queue.Front + int(offset)) % eh.Queue.Capacity for { e := eh.Queue.Events[i] @@ -75,7 +76,7 @@ func (eh *EventHistory) scan(key string, recursive bool, index uint64) (*Event, ok = ok || strings.HasPrefix(e.Node.Key, key) } - if ok && index <= e.Index() { // make sure we bypass the smaller one + if ok { return e, nil } From 317b34f4a0caa3478a738fddd9815587b91611e2 Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Sun, 22 Dec 2013 15:41:44 -0800 Subject: [PATCH 3/4] refactor(store/event_history): cleanup some comments --- store/event_history.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/store/event_history.go b/store/event_history.go index 02afb941cf0..308932e0330 100644 --- a/store/event_history.go +++ b/store/event_history.go @@ -39,13 +39,13 @@ func (eh *EventHistory) addEvent(e *Event) *Event { return e } -// scan function is enumerating events from the index in history and -// stops till the first point where the key has identified key +// scan enumerates events from the index history and stops at the first point +// where the key matches. func (eh *EventHistory) scan(key string, recursive bool, index uint64) (*Event, *etcdErr.Error) { eh.rwl.RLock() defer eh.rwl.RUnlock() - // the index should locate after the event history's StartIndex + // index should be after the event history's StartIndex if index < eh.StartIndex { return nil, etcdErr.NewError(etcdErr.EcodeEventIndexCleared, @@ -53,7 +53,7 @@ func (eh *EventHistory) scan(key string, recursive bool, index uint64) (*Event, eh.StartIndex, index), 0) } - // the index should locate before the size of the queue minus the duplicate count + // the index should come before the size of the queue minus the duplicate count if index > eh.LastIndex { // future index return nil, nil } From e1d909eb0e6aad3c1e7b0e3e63fb6f76f0fc9f2c Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Sun, 22 Dec 2013 15:38:59 -0800 Subject: [PATCH 4/4] test(store/event_test): add a test for a full queue --- store/event_test.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/store/event_test.go b/store/event_test.go index a4579b72abc..8515cf767b9 100644 --- a/store/event_test.go +++ b/store/event_test.go @@ -64,3 +64,23 @@ func TestScanHistory(t *testing.T) { t.Fatalf("bad index shoud reuturn nil") } } + +// TestFullEventQueue tests a queue with capacity = 10 +// Add 1000 events into that queue, and test if scanning +// works still for previous events. +func TestFullEventQueue(t *testing.T) { + + eh := newEventHistory(10) + + // Add + for i := 0; i < 1000; i++ { + e := newEvent(Create, "/foo", uint64(i), uint64(i)) + eh.addEvent(e) + e, err := eh.scan("/foo", true, uint64(i-1)) + if i > 0 { + if e == nil || err != nil { + t.Fatalf("scan error [/foo] [%v] %v", i-1, i) + } + } + } +}