Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
update lastWrite on write into rob and flush rob on GC
Browse files Browse the repository at this point in the history
  • Loading branch information
replay committed Dec 13, 2017
1 parent 0c94d79 commit fc64bb5
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 18 deletions.
22 changes: 20 additions & 2 deletions mdata/aggmetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,12 @@ func (a *AggMetric) Add(ts uint32, val float64) {
a.add(ts, val)
} else {
// write through reorder buffer
res := a.rob.Add(ts, val)
res, accepted := a.rob.Add(ts, val)

if len(res) == 0 && accepted {
a.lastWrite = uint32(time.Now().Unix())
}

for _, p := range res {
a.add(p.Ts, p.Val)
}
Expand Down Expand Up @@ -541,8 +546,21 @@ func (a *AggMetric) GC(chunkMinTs, metricMinTs uint32) bool {
a.Lock()
defer a.Unlock()

// if the reorderBuffer is enabled and we have not received a datapoint in a while,
// then flush the reorder buffer.
if a.rob != nil && a.lastWrite < chunkMinTs {
tmpLastWrite := a.lastWrite
pts := a.rob.Flush()
for _, p := range pts {
a.add(p.Ts, p.Val)
}

// adding points will cause our lastWrite to be updated, but we want to keep the old value
a.lastWrite = tmpLastWrite
}

// this aggMetric has never had metrics written to it.
if len(a.Chunks) == 0 {
if len(a.Chunks) == 0 && (a.rob == nil || !a.rob.HasData()) {
return true
}

Expand Down
25 changes: 22 additions & 3 deletions mdata/reorder_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ func NewReorderBuffer(reorderWindow uint32, interval int) *ReorderBuffer {
return buf
}

func (rob *ReorderBuffer) Add(ts uint32, val float64) []schema.Point {
func (rob *ReorderBuffer) Add(ts uint32, val float64) ([]schema.Point, bool) {
ts = AggBoundary(ts, rob.interval)

// out of order and too old
if rob.buf[rob.newest].Ts != 0 && ts <= rob.buf[rob.newest].Ts-(rob.len*rob.interval) {
metricsTooOld.Inc()
return nil
return nil, false
}

var res []schema.Point
Expand Down Expand Up @@ -59,7 +59,7 @@ func (rob *ReorderBuffer) Add(ts uint32, val float64) []schema.Point {
rob.buf[index].Val = val
}

return res
return res, true
}

// returns all the data in the buffer as a raw list of points
Expand All @@ -79,3 +79,22 @@ func (rob *ReorderBuffer) Get() []schema.Point {

return res
}

func (rob *ReorderBuffer) Reset() {
for i := range rob.buf {
rob.buf[i].Ts = 0
rob.buf[i].Val = 0
}
rob.newest = 0
}

func (rob *ReorderBuffer) Flush() []schema.Point {
res := rob.Get()
rob.Reset()

return res
}

func (rob *ReorderBuffer) HasData() bool {
return rob.buf[rob.newest].Ts != 0
}
53 changes: 40 additions & 13 deletions mdata/reorder_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,27 @@ import (
"gopkg.in/raintank/schema.v1"
)

func testAddAndGet(t *testing.T, reorderWindow uint32, testData, expectedData []schema.Point, expectAddFail, expectReordered uint32) []schema.Point {
func testAddAndGet(t *testing.T, reorderWindow uint32, testData, expectedData []schema.Point, expectAdded, expectAddFail, expectReordered uint32) []schema.Point {
var flushed []schema.Point
b := NewReorderBuffer(reorderWindow, 1)
metricsTooOld.SetUint32(0)
metricsReordered.SetUint32(0)
addedCount := uint32(0)
for _, point := range testData {
addRes := b.Add(point.Ts, point.Val)
addRes, accepted := b.Add(point.Ts, point.Val)
flushed = append(flushed, addRes...)
if accepted {
addedCount++
}
}
if expectAddFail != metricsTooOld.Peek() {
t.Fatalf("Expected %d failures, but had %d", expectAddFail, metricsTooOld.Peek())
}

if expectAdded != addedCount {
t.Fatalf("Expected %d accepted datapoints, but had %d", expectAdded, addedCount)
}

if metricsReordered.Peek() != expectReordered {
t.Fatalf("Expected %d metrics to get reordered, but had %d", expectReordered, metricsReordered.Peek())
}
Expand Down Expand Up @@ -95,7 +104,7 @@ func TestReorderBufferAddAndGetInOrder(t *testing.T) {
{Ts: 1002, Val: 200},
{Ts: 1003, Val: 300},
}
testAddAndGet(t, 600, testData, expectedData, 0, 0)
testAddAndGet(t, 600, testData, expectedData, 3, 0, 0)
}

func TestReorderBufferAddAndGetInReverseOrderOutOfWindow(t *testing.T) {
Expand All @@ -107,7 +116,7 @@ func TestReorderBufferAddAndGetInReverseOrderOutOfWindow(t *testing.T) {
expectedData := []schema.Point{
{Ts: 1003, Val: 300},
}
testAddAndGet(t, 1, testData, expectedData, 2, 0)
testAddAndGet(t, 1, testData, expectedData, 1, 2, 0)
}

func TestReorderBufferAddAndGetOutOfOrderInsideWindow(t *testing.T) {
Expand All @@ -133,7 +142,7 @@ func TestReorderBufferAddAndGetOutOfOrderInsideWindow(t *testing.T) {
{Ts: 1008, Val: 800},
{Ts: 1009, Val: 900},
}
testAddAndGet(t, 600, testData, expectedData, 0, 2)
testAddAndGet(t, 600, testData, expectedData, 9, 0, 2)
}

func TestReorderBufferAddAndGetOutOfOrderInsideWindowAsFirstPoint(t *testing.T) {
Expand All @@ -159,13 +168,13 @@ func TestReorderBufferAddAndGetOutOfOrderInsideWindowAsFirstPoint(t *testing.T)
{Ts: 1008, Val: 800},
{Ts: 1009, Val: 900},
}
testAddAndGet(t, 600, testData, expectedData, 0, 3)
testAddAndGet(t, 600, testData, expectedData, 9, 0, 3)
}

func TestReorderBufferOmitFlushIfNotEnoughData(t *testing.T) {
b := NewReorderBuffer(9, 1)
for i := uint32(1); i < 10; i++ {
flushed := b.Add(i, float64(i*100))
flushed, _ := b.Add(i, float64(i*100))
if len(flushed) > 0 {
t.Fatalf("Expected no data to get flushed out")
}
Expand Down Expand Up @@ -197,7 +206,7 @@ func TestReorderBufferAddAndGetOutOfOrderOutOfWindow(t *testing.T) {
{Ts: 1003, Val: 300},
{Ts: 1004, Val: 400},
}
flushedData := testAddAndGet(t, 5, testData, expectedData, 1, 2)
flushedData := testAddAndGet(t, 5, testData, expectedData, 8, 1, 2)
if !reflect.DeepEqual(flushedData, expectedFlushedData) {
t.Fatalf("Flushed data does not match expected flushed data:\n%+v\n%+v", flushedData, expectedFlushedData)
}
Expand All @@ -208,8 +217,8 @@ func TestReorderBufferFlushSortedData(t *testing.T) {
buf := NewReorderBuffer(600, 1)
metricsTooOld.SetUint32(0)
for i := 1100; i < 2100; i++ {
flushed := buf.Add(uint32(i), float64(i))
if metricsTooOld.Peek() != 0 {
flushed, accepted := buf.Add(uint32(i), float64(i))
if metricsTooOld.Peek() != 0 || !accepted {
t.Fatalf("Adding failed")
}
results = append(results, flushed...)
Expand Down Expand Up @@ -238,8 +247,8 @@ func TestReorderBufferFlushUnsortedData1(t *testing.T) {
failedCount := 0
metricsTooOld.SetUint32(0)
for _, p := range data {
flushed := buf.Add(p.Ts, p.Val)
if metricsTooOld.Peek() != 0 {
flushed, accepted := buf.Add(p.Ts, p.Val)
if metricsTooOld.Peek() != 0 || !accepted {
failedCount++
metricsTooOld.SetUint32(0)
} else {
Expand Down Expand Up @@ -272,7 +281,7 @@ func TestReorderBufferFlushUnsortedData2(t *testing.T) {
}
unsortedData := unsort(data, 10)
for i := 0; i < len(data); i++ {
flushed := buf.Add(unsortedData[i].Ts, unsortedData[i].Val)
flushed, _ := buf.Add(unsortedData[i].Ts, unsortedData[i].Val)
results = append(results, flushed...)
}
for i := 0; i < 400; i++ {
Expand All @@ -282,6 +291,24 @@ func TestReorderBufferFlushUnsortedData2(t *testing.T) {
}
}

func TestReorderBufferFlushAndHasData(t *testing.T) {
buf := NewReorderBuffer(10, 1)

if buf.HasData() != false {
t.Fatalf("Expected HasData() to be false")
}

buf.Add(123, 123)
if buf.HasData() != true {
t.Fatalf("Expected HasData() to be true")
}

buf.Reset()
if buf.HasData() != false {
t.Fatalf("Expected HasData() to be false")
}
}

func BenchmarkAddInOrder(b *testing.B) {
data := make([]schema.Point, b.N)
buf := NewReorderBuffer(uint32(b.N), 1)
Expand Down

0 comments on commit fc64bb5

Please sign in to comment.