Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
replay committed Dec 12, 2017
1 parent 145b3d8 commit fad1f1c
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 14 deletions.
3 changes: 2 additions & 1 deletion mdata/aggmetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,8 @@ func (a *AggMetric) Add(ts uint32, val float64) {
} else {
// write through reorder buffer
res, accepted := a.rob.Add(ts, val)
if accepted {

if len(res) == 0 && accepted {
a.lastWrite = uint32(time.Now().Unix())
}

Expand Down
35 changes: 22 additions & 13 deletions mdata/reorder_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,27 @@ import (
"gopkg.in/raintank/schema.v1"
)

func testAddAndGet(t *testing.T, reorderWindow uint32, testData, expectedData []schema.Point, expectAddFail, expectReordered uint32) []schema.Point {
func testAddAndGet(t *testing.T, reorderWindow uint32, testData, expectedData []schema.Point, expectAdded, expectAddFail, expectReordered uint32) []schema.Point {
var flushed []schema.Point
b := NewReorderBuffer(reorderWindow, 1)
metricsTooOld.SetUint32(0)
metricsReordered.SetUint32(0)
addedCount := uint32(0)
for _, point := range testData {
addRes := b.Add(point.Ts, point.Val)
addRes, accepted := b.Add(point.Ts, point.Val)
flushed = append(flushed, addRes...)
if accepted {
addedCount++
}
}
if expectAddFail != metricsTooOld.Peek() {
t.Fatalf("Expected %d failures, but had %d", expectAddFail, metricsTooOld.Peek())
}

if expectAdded != addedCount {
t.Fatalf("Expected %d accepted datapoints, but had %d", expectAdded, addedCount)
}

if metricsReordered.Peek() != expectReordered {
t.Fatalf("Expected %d metrics to get reordered, but had %d", expectReordered, metricsReordered.Peek())
}
Expand Down Expand Up @@ -95,7 +104,7 @@ func TestReorderBufferAddAndGetInOrder(t *testing.T) {
{Ts: 1002, Val: 200},
{Ts: 1003, Val: 300},
}
testAddAndGet(t, 600, testData, expectedData, 0, 0)
testAddAndGet(t, 600, testData, expectedData, 3, 0, 0)
}

func TestReorderBufferAddAndGetInReverseOrderOutOfWindow(t *testing.T) {
Expand All @@ -107,7 +116,7 @@ func TestReorderBufferAddAndGetInReverseOrderOutOfWindow(t *testing.T) {
expectedData := []schema.Point{
{Ts: 1003, Val: 300},
}
testAddAndGet(t, 1, testData, expectedData, 2, 0)
testAddAndGet(t, 1, testData, expectedData, 1, 2, 0)
}

func TestReorderBufferAddAndGetOutOfOrderInsideWindow(t *testing.T) {
Expand All @@ -133,7 +142,7 @@ func TestReorderBufferAddAndGetOutOfOrderInsideWindow(t *testing.T) {
{Ts: 1008, Val: 800},
{Ts: 1009, Val: 900},
}
testAddAndGet(t, 600, testData, expectedData, 0, 2)
testAddAndGet(t, 600, testData, expectedData, 9, 0, 2)
}

func TestReorderBufferAddAndGetOutOfOrderInsideWindowAsFirstPoint(t *testing.T) {
Expand All @@ -159,13 +168,13 @@ func TestReorderBufferAddAndGetOutOfOrderInsideWindowAsFirstPoint(t *testing.T)
{Ts: 1008, Val: 800},
{Ts: 1009, Val: 900},
}
testAddAndGet(t, 600, testData, expectedData, 0, 3)
testAddAndGet(t, 600, testData, expectedData, 9, 0, 3)
}

func TestReorderBufferOmitFlushIfNotEnoughData(t *testing.T) {
b := NewReorderBuffer(9, 1)
for i := uint32(1); i < 10; i++ {
flushed := b.Add(i, float64(i*100))
flushed, _ := b.Add(i, float64(i*100))
if len(flushed) > 0 {
t.Fatalf("Expected no data to get flushed out")
}
Expand Down Expand Up @@ -197,7 +206,7 @@ func TestReorderBufferAddAndGetOutOfOrderOutOfWindow(t *testing.T) {
{Ts: 1003, Val: 300},
{Ts: 1004, Val: 400},
}
flushedData := testAddAndGet(t, 5, testData, expectedData, 1, 2)
flushedData := testAddAndGet(t, 5, testData, expectedData, 8, 1, 2)
if !reflect.DeepEqual(flushedData, expectedFlushedData) {
t.Fatalf("Flushed data does not match expected flushed data:\n%+v\n%+v", flushedData, expectedFlushedData)
}
Expand All @@ -208,8 +217,8 @@ func TestReorderBufferFlushSortedData(t *testing.T) {
buf := NewReorderBuffer(600, 1)
metricsTooOld.SetUint32(0)
for i := 1100; i < 2100; i++ {
flushed := buf.Add(uint32(i), float64(i))
if metricsTooOld.Peek() != 0 {
flushed, accepted := buf.Add(uint32(i), float64(i))
if metricsTooOld.Peek() != 0 || !accepted {
t.Fatalf("Adding failed")
}
results = append(results, flushed...)
Expand Down Expand Up @@ -238,8 +247,8 @@ func TestReorderBufferFlushUnsortedData1(t *testing.T) {
failedCount := 0
metricsTooOld.SetUint32(0)
for _, p := range data {
flushed := buf.Add(p.Ts, p.Val)
if metricsTooOld.Peek() != 0 {
flushed, accepted := buf.Add(p.Ts, p.Val)
if metricsTooOld.Peek() != 0 || !accepted {
failedCount++
metricsTooOld.SetUint32(0)
} else {
Expand Down Expand Up @@ -272,7 +281,7 @@ func TestReorderBufferFlushUnsortedData2(t *testing.T) {
}
unsortedData := unsort(data, 10)
for i := 0; i < len(data); i++ {
flushed := buf.Add(unsortedData[i].Ts, unsortedData[i].Val)
flushed, _ := buf.Add(unsortedData[i].Ts, unsortedData[i].Val)
results = append(results, flushed...)
}
for i := 0; i < 400; i++ {
Expand Down

0 comments on commit fad1f1c

Please sign in to comment.