Skip to content

Commit

Permalink
Merge pull request #413 from philips/event_history
Browse files Browse the repository at this point in the history
fix(event_history): fix a bug in event queue
  • Loading branch information
xiang90 committed Dec 22, 2013
2 parents 70c8c09 + e1d909e commit d455371
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 18 deletions.
2 changes: 1 addition & 1 deletion server/v2/get_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
// Start the watcher on the store.
eventChan, err := s.Store().Watch(key, recursive, sinceIndex)
if err != nil {
return etcdErr.NewError(500, key, s.Store().Index())
return err
}

cn, _ := w.(http.CloseNotifier)
Expand Down
18 changes: 10 additions & 8 deletions store/event_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,27 @@ func (eh *EventHistory) addEvent(e *Event) *Event {
return e
}

// scan function is enumerating events from the index in history and
// stops till the first point where the key has identified key
// scan enumerates events from the index history and stops at the first point
// where the key matches.
func (eh *EventHistory) scan(key string, recursive bool, index uint64) (*Event, *etcdErr.Error) {
eh.rwl.RLock()
defer eh.rwl.RUnlock()

// the index should locate after the event history's StartIndex
if index-eh.StartIndex < 0 {
// index should be after the event history's StartIndex
if index < eh.StartIndex {
return nil,
etcdErr.NewError(etcdErr.EcodeEventIndexCleared,
fmt.Sprintf("the requested history has been cleared [%v/%v]",
eh.StartIndex, index), 0)
}

// the index should locate before the size of the queue minus the duplicate count
// the index should come before the size of the queue minus the duplicate count
if index > eh.LastIndex { // future index
return nil, nil
}

i := eh.Queue.Front
offset := index - eh.StartIndex
i := (eh.Queue.Front + int(offset)) % eh.Queue.Capacity

for {
e := eh.Queue.Events[i]
Expand All @@ -75,13 +76,13 @@ func (eh *EventHistory) scan(key string, recursive bool, index uint64) (*Event,
ok = ok || strings.HasPrefix(e.Node.Key, key)
}

if ok && index <= e.Index() { // make sure we bypass the smaller one
if ok {
return e, nil
}

i = (i + 1) % eh.Queue.Capacity

if i > eh.Queue.back() {
if i == eh.Queue.Back {
return nil, nil
}
}
Expand All @@ -95,6 +96,7 @@ func (eh *EventHistory) clone() *EventHistory {
Events: make([]*Event, eh.Queue.Capacity),
Size: eh.Queue.Size,
Front: eh.Queue.Front,
Back: eh.Queue.Back,
}

for i, e := range eh.Queue.Events {
Expand Down
13 changes: 4 additions & 9 deletions store/event_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,17 @@ type eventQueue struct {
Events []*Event
Size int
Front int
Back int
Capacity int
}

func (eq *eventQueue) back() int {
return (eq.Front + eq.Size - 1 + eq.Capacity) % eq.Capacity
}

func (eq *eventQueue) insert(e *Event) {
index := (eq.back() + 1) % eq.Capacity

eq.Events[index] = e
eq.Events[eq.Back] = e
eq.Back = (eq.Back + 1) % eq.Capacity

if eq.Size == eq.Capacity { //dequeue
eq.Front = (index + 1) % eq.Capacity
eq.Front = (eq.Front + 1) % eq.Capacity
} else {
eq.Size++
}

}
20 changes: 20 additions & 0 deletions store/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,23 @@ func TestScanHistory(t *testing.T) {
t.Fatalf("bad index shoud reuturn nil")
}
}

// TestFullEventQueue tests a queue with capacity = 10
// Add 1000 events into that queue, and test if scanning
// works still for previous events.
func TestFullEventQueue(t *testing.T) {

eh := newEventHistory(10)

// Add
for i := 0; i < 1000; i++ {
e := newEvent(Create, "/foo", uint64(i), uint64(i))
eh.addEvent(e)
e, err := eh.scan("/foo", true, uint64(i-1))
if i > 0 {
if e == nil || err != nil {
t.Fatalf("scan error [/foo] [%v] %v", i-1, i)
}
}
}
}

0 comments on commit d455371

Please sign in to comment.