Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
flush reorder buffer on GC
Browse files Browse the repository at this point in the history
  • Loading branch information
replay committed Dec 13, 2017
1 parent e111fd2 commit 2ff3e3b
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 1 deletion.
15 changes: 14 additions & 1 deletion mdata/aggmetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,8 +546,21 @@ func (a *AggMetric) GC(chunkMinTs, metricMinTs uint32) bool {
a.Lock()
defer a.Unlock()

// if the reorderBuffer is enabled and we have not received a datapoint in a while,
// then flush the reorder buffer.
if a.rob != nil && a.lastWrite < chunkMinTs {
tmpLastWrite := a.lastWrite
pts := a.rob.Flush()
for _, p := range pts {
a.add(p.Ts, p.Val)
}

// adding points will cause our lastWrite to be updated, but we want to keep the old value
a.lastWrite = tmpLastWrite
}

// this aggMetric has never had metrics written to it.
if len(a.Chunks) == 0 {
if len(a.Chunks) == 0 && !a.rob.HasData() {
return true
}

Expand Down
25 changes: 25 additions & 0 deletions mdata/reorder_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,28 @@ func (rob *ReorderBuffer) Get() []schema.Point {

return res
}

func (rob *ReorderBuffer) Reset() {
for i := range rob.buf {
rob.buf[i].Ts = 0
rob.buf[i].Val = 0
}
rob.newest = 0
}

func (rob *ReorderBuffer) Flush() []schema.Point {
res := rob.Get()
rob.Reset()

return res
}

func (rob *ReorderBuffer) HasData() bool {
for _, point := range rob.buf {
if point.Ts > 0 {
return true
}
}

return false
}
18 changes: 18 additions & 0 deletions mdata/reorder_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,24 @@ func TestReorderBufferFlushUnsortedData2(t *testing.T) {
}
}

func TestReorderBufferFlushAndHasData(t *testing.T) {
buf := NewReorderBuffer(10, 1)

if buf.HasData() != false {
t.Fatalf("Expected HasData() to be false")
}

buf.Add(123, 123)
if buf.HasData() != true {
t.Fatalf("Expected HasData() to be true")
}

buf.Reset()
if buf.HasData() != false {
t.Fatalf("Expected HasData() to be false")
}
}

func BenchmarkAddInOrder(b *testing.B) {
data := make([]schema.Point, b.N)
buf := NewReorderBuffer(uint32(b.N), 1)
Expand Down

0 comments on commit 2ff3e3b

Please sign in to comment.