From ebda575998c6803cc196136ff6fd789f93d071cf Mon Sep 17 00:00:00 2001 From: George MacRorie Date: Mon, 4 Nov 2019 13:26:48 +0100 Subject: [PATCH 1/4] fix(storage): add failing test for array cursor iterator stats --- tsdb/tsm1/engine_cursor_test.go | 122 ++++++++++++++++++++++++++++++++ 1 file changed, 122 insertions(+) create mode 100644 tsdb/tsm1/engine_cursor_test.go diff --git a/tsdb/tsm1/engine_cursor_test.go b/tsdb/tsm1/engine_cursor_test.go new file mode 100644 index 00000000000..06b7edac6bd --- /dev/null +++ b/tsdb/tsm1/engine_cursor_test.go @@ -0,0 +1,122 @@ +package tsm1_test + +import ( + "context" + "testing" + "time" + + "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/tsdb" + "github.com/influxdata/influxdb/tsdb/cursors" +) + +func TestEngine_CursorIterator_Stats(t *testing.T) { + e := MustOpenEngine() + defer e.Close() + + points := []models.Point{ + models.MustNewPoint("cpu", + models.Tags{ + {Key: []byte("a"), Value: []byte("b")}, + }, + models.Fields{"value": 4.6}, + time.Now().UTC(), + ), + models.MustNewPoint("cpu", + models.Tags{ + {Key: []byte("a"), Value: []byte("b")}, + }, + models.Fields{"value": 3.2}, + time.Now().UTC(), + ), + models.MustNewPoint("mem", + models.Tags{ + {Key: []byte("b"), Value: []byte("c")}, + }, + models.Fields{"value": int64(3)}, + time.Now().UTC(), + ), + } + + // Write into the index. + collection := tsdb.NewSeriesCollection(points) + if err := e.index.CreateSeriesListIfNotExists(collection); err != nil { + t.Fatal(err) + } + + if err := e.WritePoints(points); err != nil { + t.Fatal(err) + } + + e.MustWriteSnapshot() + + ctx := context.Background() + cursorIterator, err := e.CreateCursorIterator(ctx) + if err != nil { + t.Fatal(err) + } + + cur, err := cursorIterator.Next(ctx, &tsdb.CursorRequest{ + Name: []byte("cpu"), + Tags: []models.Tag{{Key: []byte("a"), Value: []byte("b")}}, + Field: "value", + EndTime: time.Now().UTC().UnixNano(), + Ascending: true, + }) + + if err != nil { + t.Fatal(err) + } + + if cur == nil { + t.Fatal("expected cursor to be present") + } + + fc, ok := cur.(cursors.FloatArrayCursor) + if !ok { + t.Fatalf("unexpected cursor type: expected FloatArrayCursor, got %#v", cur) + } + + // drain the cursor + for a := fc.Next(); a.Len() > 0; a = fc.Next() { + } + + // iterator should report float stats + if got, exp := cursorIterator.Stats(), (cursors.CursorStats{ScannedValues: 2, ScannedBytes: 16}); exp != got { + t.Fatalf("expected %v, got %v", exp, got) + } + + cur.Close() + + cur, err = cursorIterator.Next(ctx, &tsdb.CursorRequest{ + Name: []byte("mem"), + Tags: []models.Tag{{Key: []byte("b"), Value: []byte("c")}}, + Field: "value", + EndTime: time.Now().UTC().UnixNano(), + Ascending: true, + }) + + if err != nil { + t.Fatal(err) + } + + if cur == nil { + t.Fatal("expected cursor to be present") + } + + defer cur.Close() + + ic, ok := cur.(cursors.IntegerArrayCursor) + if !ok { + t.Fatalf("unexpected cursor type: expected FloatArrayCursor, got %#v", cur) + } + + // drain the cursor + for a := ic.Next(); a.Len() > 0; a = ic.Next() { + } + + // iterator should report integer array stats + if got, exp := cursorIterator.Stats(), (cursors.CursorStats{ScannedValues: 1, ScannedBytes: 8}); exp != got { + t.Fatalf("expected %v, got %v", exp, got) + } +} From 5fcd1937b2e8e209f2303517cd7040550f2cbc9c Mon Sep 17 00:00:00 2001 From: George MacRorie Date: Mon, 4 Nov 2019 16:00:23 +0100 Subject: [PATCH 2/4] fix(storage): make arrayCursorIterator.Stats() return stats of in-focus cursor --- tsdb/tsm1/array_cursor_iterator.go | 99 +++++++++++++++++++++--------- 1 file changed, 70 insertions(+), 29 deletions(-) diff --git a/tsdb/tsm1/array_cursor_iterator.go b/tsdb/tsm1/array_cursor_iterator.go index 8ad3ef7fe8e..a6b5b7c198f 100644 --- a/tsdb/tsm1/array_cursor_iterator.go +++ b/tsdb/tsm1/array_cursor_iterator.go @@ -12,8 +12,10 @@ import ( ) type arrayCursorIterator struct { - e *Engine - key []byte + e *Engine + key []byte + id tsdb.SeriesIDTyped + isAsc bool asc struct { Float *floatArrayAscendingCursor @@ -34,8 +36,9 @@ type arrayCursorIterator struct { func (q *arrayCursorIterator) Next(ctx context.Context, r *tsdb.CursorRequest) (tsdb.Cursor, error) { q.key = tsdb.AppendSeriesKey(q.key[:0], r.Name, r.Tags) - id := q.e.sfile.SeriesIDTypedBySeriesKey(q.key) - if id.IsZero() { + q.isAsc = r.Ascending + q.id = q.e.sfile.SeriesIDTypedBySeriesKey(q.key) + if q.id.IsZero() { return nil, nil } @@ -51,7 +54,7 @@ func (q *arrayCursorIterator) Next(ctx context.Context, r *tsdb.CursorRequest) ( opt.EndTime = r.EndTime // Return appropriate cursor based on type. - switch typ := id.Type(); typ { + switch typ := q.id.Type(); typ { case models.Float: return q.buildFloatArrayCursor(ctx, r.Name, r.Tags, r.Field, opt), nil case models.Integer: @@ -75,29 +78,67 @@ func (q *arrayCursorIterator) seriesFieldKeyBytes(name []byte, tags models.Tags, } // Stats returns the cumulative stats for all cursors. -func (q *arrayCursorIterator) Stats() cursors.CursorStats { - var stats cursors.CursorStats - if cur := q.asc.Float; cur != nil { - stats.Add(cur.Stats()) - } else if cur := q.asc.Integer; cur != nil { - stats.Add(cur.Stats()) - } else if cur := q.asc.Unsigned; cur != nil { - stats.Add(cur.Stats()) - } else if cur := q.asc.Boolean; cur != nil { - stats.Add(cur.Stats()) - } else if cur := q.asc.String; cur != nil { - stats.Add(cur.Stats()) - } - if cur := q.desc.Float; cur != nil { - stats.Add(cur.Stats()) - } else if cur := q.desc.Integer; cur != nil { - stats.Add(cur.Stats()) - } else if cur := q.desc.Unsigned; cur != nil { - stats.Add(cur.Stats()) - } else if cur := q.desc.Boolean; cur != nil { - stats.Add(cur.Stats()) - } else if cur := q.desc.String; cur != nil { - stats.Add(cur.Stats()) +func (q *arrayCursorIterator) Stats() (stats cursors.CursorStats) { + // Return appropriate cursor based on type. + switch typ := q.id.Type(); typ { + case models.Float: + if q.isAsc { + if cur := q.asc.Float; cur != nil { + stats.Add(cur.Stats()) + } + return + } + + if cur := q.desc.Float; cur != nil { + stats.Add(cur.Stats()) + } + case models.Integer: + if q.isAsc { + if cur := q.asc.Integer; cur != nil { + stats.Add(cur.Stats()) + } + return + } + + if cur := q.desc.Integer; cur != nil { + stats.Add(cur.Stats()) + } + case models.Unsigned: + if q.isAsc { + if cur := q.asc.Unsigned; cur != nil { + stats.Add(cur.Stats()) + } + return + } + + if cur := q.desc.Unsigned; cur != nil { + stats.Add(cur.Stats()) + } + case models.String: + if q.isAsc { + if cur := q.asc.String; cur != nil { + stats.Add(cur.Stats()) + } + return + } + + if cur := q.desc.String; cur != nil { + stats.Add(cur.Stats()) + } + case models.Boolean: + if q.isAsc { + if cur := q.asc.Boolean; cur != nil { + stats.Add(cur.Stats()) + } + return + } + + if cur := q.desc.Boolean; cur != nil { + stats.Add(cur.Stats()) + } + default: + panic(fmt.Sprintf("unreachable: %v", typ)) } - return stats + + return } From 4af0c0ad98095a8c49abda93097f64a64053d14a Mon Sep 17 00:00:00 2001 From: George MacRorie Date: Mon, 4 Nov 2019 18:27:49 +0100 Subject: [PATCH 3/4] fix(storage): add failing test to assert arrayCursorIterator.Stats() returns accumulated result --- tsdb/tsm1/engine_cursor_test.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/tsdb/tsm1/engine_cursor_test.go b/tsdb/tsm1/engine_cursor_test.go index 06b7edac6bd..6b2652a99b6 100644 --- a/tsdb/tsm1/engine_cursor_test.go +++ b/tsdb/tsm1/engine_cursor_test.go @@ -81,11 +81,6 @@ func TestEngine_CursorIterator_Stats(t *testing.T) { for a := fc.Next(); a.Len() > 0; a = fc.Next() { } - // iterator should report float stats - if got, exp := cursorIterator.Stats(), (cursors.CursorStats{ScannedValues: 2, ScannedBytes: 16}); exp != got { - t.Fatalf("expected %v, got %v", exp, got) - } - cur.Close() cur, err = cursorIterator.Next(ctx, &tsdb.CursorRequest{ @@ -116,7 +111,7 @@ func TestEngine_CursorIterator_Stats(t *testing.T) { } // iterator should report integer array stats - if got, exp := cursorIterator.Stats(), (cursors.CursorStats{ScannedValues: 1, ScannedBytes: 8}); exp != got { + if got, exp := cursorIterator.Stats(), (cursors.CursorStats{ScannedValues: 3, ScannedBytes: 24}); exp != got { t.Fatalf("expected %v, got %v", exp, got) } } From 6e616b90e6c2f236ce3d6f01e39c46a161e16e3e Mon Sep 17 00:00:00 2001 From: George MacRorie Date: Mon, 4 Nov 2019 18:29:25 +0100 Subject: [PATCH 4/4] fix(storage): assumulate stats in arrayCursorIterator.Stats() call across all observed cursors --- CHANGELOG.md | 6 ++ tsdb/tsm1/array_cursor_iterator.go | 107 ++++++++++------------------- 2 files changed, 43 insertions(+), 70 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f987eca45c..01b9f6f77f8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## v2.0.0-alpha.20 [unreleased] + +### Bug Fixes + +1. [15731](https://github.com/influxdata/influxdb/pull/15731): Ensure array cursor iterator stats accumulate all cursor stats + ## v2.0.0-alpha.19 [2019-10-30] ### Features diff --git a/tsdb/tsm1/array_cursor_iterator.go b/tsdb/tsm1/array_cursor_iterator.go index a6b5b7c198f..f2dae11c748 100644 --- a/tsdb/tsm1/array_cursor_iterator.go +++ b/tsdb/tsm1/array_cursor_iterator.go @@ -12,10 +12,8 @@ import ( ) type arrayCursorIterator struct { - e *Engine - key []byte - id tsdb.SeriesIDTyped - isAsc bool + e *Engine + key []byte asc struct { Float *floatArrayAscendingCursor @@ -36,9 +34,8 @@ type arrayCursorIterator struct { func (q *arrayCursorIterator) Next(ctx context.Context, r *tsdb.CursorRequest) (tsdb.Cursor, error) { q.key = tsdb.AppendSeriesKey(q.key[:0], r.Name, r.Tags) - q.isAsc = r.Ascending - q.id = q.e.sfile.SeriesIDTypedBySeriesKey(q.key) - if q.id.IsZero() { + id := q.e.sfile.SeriesIDTypedBySeriesKey(q.key) + if id.IsZero() { return nil, nil } @@ -54,7 +51,7 @@ func (q *arrayCursorIterator) Next(ctx context.Context, r *tsdb.CursorRequest) ( opt.EndTime = r.EndTime // Return appropriate cursor based on type. - switch typ := q.id.Type(); typ { + switch typ := id.Type(); typ { case models.Float: return q.buildFloatArrayCursor(ctx, r.Name, r.Tags, r.Field, opt), nil case models.Integer: @@ -78,67 +75,37 @@ func (q *arrayCursorIterator) seriesFieldKeyBytes(name []byte, tags models.Tags, } // Stats returns the cumulative stats for all cursors. -func (q *arrayCursorIterator) Stats() (stats cursors.CursorStats) { - // Return appropriate cursor based on type. - switch typ := q.id.Type(); typ { - case models.Float: - if q.isAsc { - if cur := q.asc.Float; cur != nil { - stats.Add(cur.Stats()) - } - return - } - - if cur := q.desc.Float; cur != nil { - stats.Add(cur.Stats()) - } - case models.Integer: - if q.isAsc { - if cur := q.asc.Integer; cur != nil { - stats.Add(cur.Stats()) - } - return - } - - if cur := q.desc.Integer; cur != nil { - stats.Add(cur.Stats()) - } - case models.Unsigned: - if q.isAsc { - if cur := q.asc.Unsigned; cur != nil { - stats.Add(cur.Stats()) - } - return - } - - if cur := q.desc.Unsigned; cur != nil { - stats.Add(cur.Stats()) - } - case models.String: - if q.isAsc { - if cur := q.asc.String; cur != nil { - stats.Add(cur.Stats()) - } - return - } - - if cur := q.desc.String; cur != nil { - stats.Add(cur.Stats()) - } - case models.Boolean: - if q.isAsc { - if cur := q.asc.Boolean; cur != nil { - stats.Add(cur.Stats()) - } - return - } - - if cur := q.desc.Boolean; cur != nil { - stats.Add(cur.Stats()) - } - default: - panic(fmt.Sprintf("unreachable: %v", typ)) +func (q *arrayCursorIterator) Stats() cursors.CursorStats { + var stats cursors.CursorStats + if cur := q.asc.Float; cur != nil { + stats.Add(cur.Stats()) } - - return + if cur := q.asc.Integer; cur != nil { + stats.Add(cur.Stats()) + } + if cur := q.asc.Unsigned; cur != nil { + stats.Add(cur.Stats()) + } + if cur := q.asc.Boolean; cur != nil { + stats.Add(cur.Stats()) + } + if cur := q.asc.String; cur != nil { + stats.Add(cur.Stats()) + } + if cur := q.desc.Float; cur != nil { + stats.Add(cur.Stats()) + } + if cur := q.desc.Integer; cur != nil { + stats.Add(cur.Stats()) + } + if cur := q.desc.Unsigned; cur != nil { + stats.Add(cur.Stats()) + } + if cur := q.desc.Boolean; cur != nil { + stats.Add(cur.Stats()) + } + if cur := q.desc.String; cur != nil { + stats.Add(cur.Stats()) + } + return stats }