diff --git a/server/v2/get_handler.go b/server/v2/get_handler.go index 9a67ea2ae9b..abdb94f7eb3 100644 --- a/server/v2/get_handler.go +++ b/server/v2/get_handler.go @@ -57,7 +57,7 @@ func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error { // Start the watcher on the store. eventChan, err := s.Store().Watch(key, recursive, sinceIndex) if err != nil { - return etcdErr.NewError(500, key, s.Store().Index()) + return err } cn, _ := w.(http.CloseNotifier) diff --git a/store/event_history.go b/store/event_history.go index 19d781def3f..308932e0330 100644 --- a/store/event_history.go +++ b/store/event_history.go @@ -39,26 +39,27 @@ func (eh *EventHistory) addEvent(e *Event) *Event { return e } -// scan function is enumerating events from the index in history and -// stops till the first point where the key has identified key +// scan enumerates events from the index history and stops at the first point +// where the key matches. func (eh *EventHistory) scan(key string, recursive bool, index uint64) (*Event, *etcdErr.Error) { eh.rwl.RLock() defer eh.rwl.RUnlock() - // the index should locate after the event history's StartIndex - if index-eh.StartIndex < 0 { + // index should be after the event history's StartIndex + if index < eh.StartIndex { return nil, etcdErr.NewError(etcdErr.EcodeEventIndexCleared, fmt.Sprintf("the requested history has been cleared [%v/%v]", eh.StartIndex, index), 0) } - // the index should locate before the size of the queue minus the duplicate count + // the index should come before the size of the queue minus the duplicate count if index > eh.LastIndex { // future index return nil, nil } - i := eh.Queue.Front + offset := index - eh.StartIndex + i := (eh.Queue.Front + int(offset)) % eh.Queue.Capacity for { e := eh.Queue.Events[i] @@ -75,13 +76,13 @@ func (eh *EventHistory) scan(key string, recursive bool, index uint64) (*Event, ok = ok || strings.HasPrefix(e.Node.Key, key) } - if ok && index <= e.Index() { // make sure we bypass the smaller one + if ok { return e, nil } i = (i + 1) % eh.Queue.Capacity - if i > eh.Queue.back() { + if i == eh.Queue.Back { return nil, nil } } @@ -95,6 +96,7 @@ func (eh *EventHistory) clone() *EventHistory { Events: make([]*Event, eh.Queue.Capacity), Size: eh.Queue.Size, Front: eh.Queue.Front, + Back: eh.Queue.Back, } for i, e := range eh.Queue.Events { diff --git a/store/event_queue.go b/store/event_queue.go index 0852956b1b8..e32bf4cc3a4 100644 --- a/store/event_queue.go +++ b/store/event_queue.go @@ -4,22 +4,17 @@ type eventQueue struct { Events []*Event Size int Front int + Back int Capacity int } -func (eq *eventQueue) back() int { - return (eq.Front + eq.Size - 1 + eq.Capacity) % eq.Capacity -} - func (eq *eventQueue) insert(e *Event) { - index := (eq.back() + 1) % eq.Capacity - - eq.Events[index] = e + eq.Events[eq.Back] = e + eq.Back = (eq.Back + 1) % eq.Capacity if eq.Size == eq.Capacity { //dequeue - eq.Front = (index + 1) % eq.Capacity + eq.Front = (eq.Front + 1) % eq.Capacity } else { eq.Size++ } - } diff --git a/store/event_test.go b/store/event_test.go index a4579b72abc..8515cf767b9 100644 --- a/store/event_test.go +++ b/store/event_test.go @@ -64,3 +64,23 @@ func TestScanHistory(t *testing.T) { t.Fatalf("bad index shoud reuturn nil") } } + +// TestFullEventQueue tests a queue with capacity = 10 +// Add 1000 events into that queue, and test if scanning +// works still for previous events. +func TestFullEventQueue(t *testing.T) { + + eh := newEventHistory(10) + + // Add + for i := 0; i < 1000; i++ { + e := newEvent(Create, "/foo", uint64(i), uint64(i)) + eh.addEvent(e) + e, err := eh.scan("/foo", true, uint64(i-1)) + if i > 0 { + if e == nil || err != nil { + t.Fatalf("scan error [/foo] [%v] %v", i-1, i) + } + } + } +}