diff --git a/mdata/aggmetric.go b/mdata/aggmetric.go index 2d453f5ef4..a7a4926b47 100644 --- a/mdata/aggmetric.go +++ b/mdata/aggmetric.go @@ -436,7 +436,8 @@ func (a *AggMetric) Add(ts uint32, val float64) { } else { // write through reorder buffer res, accepted := a.rob.Add(ts, val) - if accepted { + + if len(res) == 0 && accepted { a.lastWrite = uint32(time.Now().Unix()) } diff --git a/mdata/reorder_buffer_test.go b/mdata/reorder_buffer_test.go index 68362ff497..68d7d91fe5 100644 --- a/mdata/reorder_buffer_test.go +++ b/mdata/reorder_buffer_test.go @@ -7,18 +7,27 @@ import ( "gopkg.in/raintank/schema.v1" ) -func testAddAndGet(t *testing.T, reorderWindow uint32, testData, expectedData []schema.Point, expectAddFail, expectReordered uint32) []schema.Point { +func testAddAndGet(t *testing.T, reorderWindow uint32, testData, expectedData []schema.Point, expectAdded, expectAddFail, expectReordered uint32) []schema.Point { var flushed []schema.Point b := NewReorderBuffer(reorderWindow, 1) metricsTooOld.SetUint32(0) metricsReordered.SetUint32(0) + addedCount := uint32(0) for _, point := range testData { - addRes := b.Add(point.Ts, point.Val) + addRes, accepted := b.Add(point.Ts, point.Val) flushed = append(flushed, addRes...) + if accepted { + addedCount++ + } } if expectAddFail != metricsTooOld.Peek() { t.Fatalf("Expected %d failures, but had %d", expectAddFail, metricsTooOld.Peek()) } + + if expectAdded != addedCount { + t.Fatalf("Expected %d accepted datapoints, but had %d", expectAdded, addedCount) + } + if metricsReordered.Peek() != expectReordered { t.Fatalf("Expected %d metrics to get reordered, but had %d", expectReordered, metricsReordered.Peek()) } @@ -95,7 +104,7 @@ func TestReorderBufferAddAndGetInOrder(t *testing.T) { {Ts: 1002, Val: 200}, {Ts: 1003, Val: 300}, } - testAddAndGet(t, 600, testData, expectedData, 0, 0) + testAddAndGet(t, 600, testData, expectedData, 3, 0, 0) } func TestReorderBufferAddAndGetInReverseOrderOutOfWindow(t *testing.T) { @@ -107,7 +116,7 @@ func TestReorderBufferAddAndGetInReverseOrderOutOfWindow(t *testing.T) { expectedData := []schema.Point{ {Ts: 1003, Val: 300}, } - testAddAndGet(t, 1, testData, expectedData, 2, 0) + testAddAndGet(t, 1, testData, expectedData, 1, 2, 0) } func TestReorderBufferAddAndGetOutOfOrderInsideWindow(t *testing.T) { @@ -133,7 +142,7 @@ func TestReorderBufferAddAndGetOutOfOrderInsideWindow(t *testing.T) { {Ts: 1008, Val: 800}, {Ts: 1009, Val: 900}, } - testAddAndGet(t, 600, testData, expectedData, 0, 2) + testAddAndGet(t, 600, testData, expectedData, 9, 0, 2) } func TestReorderBufferAddAndGetOutOfOrderInsideWindowAsFirstPoint(t *testing.T) { @@ -159,13 +168,13 @@ func TestReorderBufferAddAndGetOutOfOrderInsideWindowAsFirstPoint(t *testing.T) {Ts: 1008, Val: 800}, {Ts: 1009, Val: 900}, } - testAddAndGet(t, 600, testData, expectedData, 0, 3) + testAddAndGet(t, 600, testData, expectedData, 9, 0, 3) } func TestReorderBufferOmitFlushIfNotEnoughData(t *testing.T) { b := NewReorderBuffer(9, 1) for i := uint32(1); i < 10; i++ { - flushed := b.Add(i, float64(i*100)) + flushed, _ := b.Add(i, float64(i*100)) if len(flushed) > 0 { t.Fatalf("Expected no data to get flushed out") } @@ -197,7 +206,7 @@ func TestReorderBufferAddAndGetOutOfOrderOutOfWindow(t *testing.T) { {Ts: 1003, Val: 300}, {Ts: 1004, Val: 400}, } - flushedData := testAddAndGet(t, 5, testData, expectedData, 1, 2) + flushedData := testAddAndGet(t, 5, testData, expectedData, 8, 1, 2) if !reflect.DeepEqual(flushedData, expectedFlushedData) { t.Fatalf("Flushed data does not match expected flushed data:\n%+v\n%+v", flushedData, expectedFlushedData) } @@ -208,8 +217,8 @@ func TestReorderBufferFlushSortedData(t *testing.T) { buf := NewReorderBuffer(600, 1) metricsTooOld.SetUint32(0) for i := 1100; i < 2100; i++ { - flushed := buf.Add(uint32(i), float64(i)) - if metricsTooOld.Peek() != 0 { + flushed, accepted := buf.Add(uint32(i), float64(i)) + if metricsTooOld.Peek() != 0 || !accepted { t.Fatalf("Adding failed") } results = append(results, flushed...) @@ -238,8 +247,8 @@ func TestReorderBufferFlushUnsortedData1(t *testing.T) { failedCount := 0 metricsTooOld.SetUint32(0) for _, p := range data { - flushed := buf.Add(p.Ts, p.Val) - if metricsTooOld.Peek() != 0 { + flushed, accepted := buf.Add(p.Ts, p.Val) + if metricsTooOld.Peek() != 0 || !accepted { failedCount++ metricsTooOld.SetUint32(0) } else { @@ -272,7 +281,7 @@ func TestReorderBufferFlushUnsortedData2(t *testing.T) { } unsortedData := unsort(data, 10) for i := 0; i < len(data); i++ { - flushed := buf.Add(unsortedData[i].Ts, unsortedData[i].Val) + flushed, _ := buf.Add(unsortedData[i].Ts, unsortedData[i].Val) results = append(results, flushed...) } for i := 0; i < 400; i++ {