From 4c026ebf957fe83493e02ec5453569d170ba81cb Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Mon, 11 Dec 2017 17:04:22 +0900 Subject: [PATCH] update lastWrite on write into rob and flush rob on GC --- mdata/aggmetric.go | 22 +++++++++++++-- mdata/reorder_buffer.go | 31 +++++++++++++++++++-- mdata/reorder_buffer_test.go | 53 +++++++++++++++++++++++++++--------- 3 files changed, 88 insertions(+), 18 deletions(-) diff --git a/mdata/aggmetric.go b/mdata/aggmetric.go index 2df48f0bfd..2ca03da0d2 100644 --- a/mdata/aggmetric.go +++ b/mdata/aggmetric.go @@ -435,7 +435,12 @@ func (a *AggMetric) Add(ts uint32, val float64) { a.add(ts, val) } else { // write through reorder buffer - res := a.rob.Add(ts, val) + res, accepted := a.rob.Add(ts, val) + + if len(res) == 0 && accepted { + a.lastWrite = uint32(time.Now().Unix()) + } + for _, p := range res { a.add(p.Ts, p.Val) } @@ -541,8 +546,21 @@ func (a *AggMetric) GC(chunkMinTs, metricMinTs uint32) bool { a.Lock() defer a.Unlock() + // if the reorderBuffer is enabled and we have not received a datapoint in a while, + // then flush the reorder buffer. + if a.rob != nil && a.lastWrite < chunkMinTs { + tmpLastWrite := a.lastWrite + pts := a.rob.Flush() + for _, p := range pts { + a.add(p.Ts, p.Val) + } + + // adding points will cause our lastWrite to be updated, but we want to keep the old value + a.lastWrite = tmpLastWrite + } + // this aggMetric has never had metrics written to it. - if len(a.Chunks) == 0 { + if len(a.Chunks) == 0 && (a.rob == nil || !a.rob.HasData()) { return true } diff --git a/mdata/reorder_buffer.go b/mdata/reorder_buffer.go index 18e3b97af3..24f8c179cc 100644 --- a/mdata/reorder_buffer.go +++ b/mdata/reorder_buffer.go @@ -24,13 +24,13 @@ func NewReorderBuffer(reorderWindow uint32, interval int) *ReorderBuffer { return buf } -func (rob *ReorderBuffer) Add(ts uint32, val float64) []schema.Point { +func (rob *ReorderBuffer) Add(ts uint32, val float64) ([]schema.Point, bool) { ts = AggBoundary(ts, rob.interval) // out of order and too old if rob.buf[rob.newest].Ts != 0 && ts <= rob.buf[rob.newest].Ts-(rob.len*rob.interval) { metricsTooOld.Inc() - return nil + return nil, false } var res []schema.Point @@ -59,7 +59,7 @@ func (rob *ReorderBuffer) Add(ts uint32, val float64) []schema.Point { rob.buf[index].Val = val } - return res + return res, true } // returns all the data in the buffer as a raw list of points @@ -79,3 +79,28 @@ func (rob *ReorderBuffer) Get() []schema.Point { return res } + +func (rob *ReorderBuffer) Reset() { + for i := range rob.buf { + rob.buf[i].Ts = 0 + rob.buf[i].Val = 0 + } + rob.newest = 0 +} + +func (rob *ReorderBuffer) Flush() []schema.Point { + res := rob.Get() + rob.Reset() + + return res +} + +func (rob *ReorderBuffer) HasData() bool { + for _, point := range rob.buf { + if point.Ts > 0 { + return true + } + } + + return false +} diff --git a/mdata/reorder_buffer_test.go b/mdata/reorder_buffer_test.go index 68362ff497..a7dfeaeeea 100644 --- a/mdata/reorder_buffer_test.go +++ b/mdata/reorder_buffer_test.go @@ -7,18 +7,27 @@ import ( "gopkg.in/raintank/schema.v1" ) -func testAddAndGet(t *testing.T, reorderWindow uint32, testData, expectedData []schema.Point, expectAddFail, expectReordered uint32) []schema.Point { +func testAddAndGet(t *testing.T, reorderWindow uint32, testData, expectedData []schema.Point, expectAdded, expectAddFail, expectReordered uint32) []schema.Point { var flushed []schema.Point b := NewReorderBuffer(reorderWindow, 1) metricsTooOld.SetUint32(0) metricsReordered.SetUint32(0) + addedCount := uint32(0) for _, point := range testData { - addRes := b.Add(point.Ts, point.Val) + addRes, accepted := b.Add(point.Ts, point.Val) flushed = append(flushed, addRes...) + if accepted { + addedCount++ + } } if expectAddFail != metricsTooOld.Peek() { t.Fatalf("Expected %d failures, but had %d", expectAddFail, metricsTooOld.Peek()) } + + if expectAdded != addedCount { + t.Fatalf("Expected %d accepted datapoints, but had %d", expectAdded, addedCount) + } + if metricsReordered.Peek() != expectReordered { t.Fatalf("Expected %d metrics to get reordered, but had %d", expectReordered, metricsReordered.Peek()) } @@ -95,7 +104,7 @@ func TestReorderBufferAddAndGetInOrder(t *testing.T) { {Ts: 1002, Val: 200}, {Ts: 1003, Val: 300}, } - testAddAndGet(t, 600, testData, expectedData, 0, 0) + testAddAndGet(t, 600, testData, expectedData, 3, 0, 0) } func TestReorderBufferAddAndGetInReverseOrderOutOfWindow(t *testing.T) { @@ -107,7 +116,7 @@ func TestReorderBufferAddAndGetInReverseOrderOutOfWindow(t *testing.T) { expectedData := []schema.Point{ {Ts: 1003, Val: 300}, } - testAddAndGet(t, 1, testData, expectedData, 2, 0) + testAddAndGet(t, 1, testData, expectedData, 1, 2, 0) } func TestReorderBufferAddAndGetOutOfOrderInsideWindow(t *testing.T) { @@ -133,7 +142,7 @@ func TestReorderBufferAddAndGetOutOfOrderInsideWindow(t *testing.T) { {Ts: 1008, Val: 800}, {Ts: 1009, Val: 900}, } - testAddAndGet(t, 600, testData, expectedData, 0, 2) + testAddAndGet(t, 600, testData, expectedData, 9, 0, 2) } func TestReorderBufferAddAndGetOutOfOrderInsideWindowAsFirstPoint(t *testing.T) { @@ -159,13 +168,13 @@ func TestReorderBufferAddAndGetOutOfOrderInsideWindowAsFirstPoint(t *testing.T) {Ts: 1008, Val: 800}, {Ts: 1009, Val: 900}, } - testAddAndGet(t, 600, testData, expectedData, 0, 3) + testAddAndGet(t, 600, testData, expectedData, 9, 0, 3) } func TestReorderBufferOmitFlushIfNotEnoughData(t *testing.T) { b := NewReorderBuffer(9, 1) for i := uint32(1); i < 10; i++ { - flushed := b.Add(i, float64(i*100)) + flushed, _ := b.Add(i, float64(i*100)) if len(flushed) > 0 { t.Fatalf("Expected no data to get flushed out") } @@ -197,7 +206,7 @@ func TestReorderBufferAddAndGetOutOfOrderOutOfWindow(t *testing.T) { {Ts: 1003, Val: 300}, {Ts: 1004, Val: 400}, } - flushedData := testAddAndGet(t, 5, testData, expectedData, 1, 2) + flushedData := testAddAndGet(t, 5, testData, expectedData, 8, 1, 2) if !reflect.DeepEqual(flushedData, expectedFlushedData) { t.Fatalf("Flushed data does not match expected flushed data:\n%+v\n%+v", flushedData, expectedFlushedData) } @@ -208,8 +217,8 @@ func TestReorderBufferFlushSortedData(t *testing.T) { buf := NewReorderBuffer(600, 1) metricsTooOld.SetUint32(0) for i := 1100; i < 2100; i++ { - flushed := buf.Add(uint32(i), float64(i)) - if metricsTooOld.Peek() != 0 { + flushed, accepted := buf.Add(uint32(i), float64(i)) + if metricsTooOld.Peek() != 0 || !accepted { t.Fatalf("Adding failed") } results = append(results, flushed...) @@ -238,8 +247,8 @@ func TestReorderBufferFlushUnsortedData1(t *testing.T) { failedCount := 0 metricsTooOld.SetUint32(0) for _, p := range data { - flushed := buf.Add(p.Ts, p.Val) - if metricsTooOld.Peek() != 0 { + flushed, accepted := buf.Add(p.Ts, p.Val) + if metricsTooOld.Peek() != 0 || !accepted { failedCount++ metricsTooOld.SetUint32(0) } else { @@ -272,7 +281,7 @@ func TestReorderBufferFlushUnsortedData2(t *testing.T) { } unsortedData := unsort(data, 10) for i := 0; i < len(data); i++ { - flushed := buf.Add(unsortedData[i].Ts, unsortedData[i].Val) + flushed, _ := buf.Add(unsortedData[i].Ts, unsortedData[i].Val) results = append(results, flushed...) } for i := 0; i < 400; i++ { @@ -282,6 +291,24 @@ func TestReorderBufferFlushUnsortedData2(t *testing.T) { } } +func TestReorderBufferFlushAndHasData(t *testing.T) { + buf := NewReorderBuffer(10, 1) + + if buf.HasData() != false { + t.Fatalf("Expected HasData() to be false") + } + + buf.Add(123, 123) + if buf.HasData() != true { + t.Fatalf("Expected HasData() to be true") + } + + buf.Reset() + if buf.HasData() != false { + t.Fatalf("Expected HasData() to be false") + } +} + func BenchmarkAddInOrder(b *testing.B) { data := make([]schema.Point, b.N) buf := NewReorderBuffer(uint32(b.N), 1)