Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tsi): optimize series iteration #22316

Merged
merged 3 commits into from
Aug 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ Because of the version bump to `go`, the macOS build for this release requires a
1. [21910](https://github.com/influxdata/influxdb/pull/21910): Added `--ui-disabled` option to `influxd` to allow for running with the UI disabled.
1. [21958](https://github.com/influxdata/influxdb/pull/21958): Telemetry improvements: Do not record telemetry data for non-existant paths; replace invalid static asset paths with a slug.
1. [22023](https://github.com/influxdata/influxdb/pull/22023): Upgrade Flux to v0.124.0.
1. [22316](https://github.com/influxdata/influxdb/pull/22316): Optimize series iteration for queries that can be answered without inspecting TSM data.

### Bug Fixes

Expand Down
56 changes: 56 additions & 0 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2390,6 +2390,40 @@ func (e *Engine) CreateIterator(ctx context.Context, measurement string, opt que
return newMergeFinalizerIterator(ctx, itrs, opt, e.logger)
}

// createSeriesIterator creates an optimized series iterator if possible.
// We exclude less-common cases for now as not worth implementing.
func (e *Engine) createSeriesIterator(measurement string, ref *influxql.VarRef, is tsdb.IndexSet, opt query.IteratorOptions) (query.Iterator, error) {
// Main check to see if we are trying to create a seriesKey iterator
if ref == nil || ref.Val != "_seriesKey" || len(opt.Aux) != 0 {
return nil, nil
}
// Check some other cases that we could maybe handle, but don't
if len(opt.Dimensions) > 0 {
return nil, nil
}
if opt.SLimit != 0 || opt.SOffset != 0 {
return nil, nil
}
if opt.StripName {
return nil, nil
}
if opt.Ordered {
return nil, nil
}
// Actual creation of the iterator
seriesCursor, err := is.MeasurementSeriesKeyByExprIterator([]byte(measurement), opt.Condition, opt.Authorizer)
if err != nil {
seriesCursor.Close()
return nil, err
}
var seriesIterator query.Iterator
seriesIterator = newSeriesIterator(measurement, seriesCursor)
if opt.InterruptCh != nil {
seriesIterator = query.NewInterruptIterator(seriesIterator, opt.InterruptCh)
}
return seriesIterator, nil
}

func (e *Engine) createCallIterator(ctx context.Context, measurement string, call *influxql.Call, opt query.IteratorOptions) ([]query.Iterator, error) {
ref, _ := call.Args[0].(*influxql.VarRef)

Expand All @@ -2399,6 +2433,28 @@ func (e *Engine) createCallIterator(ctx context.Context, measurement string, cal
return nil, nil
}

// check for optimized series iteration for tsi index
if e.index.Type() == tsdb.TSI1IndexName {
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
seriesOpt := opt
if len(opt.Dimensions) == 0 && call.Name == "count" {
// no point ordering the series if we are just counting all of them
seriesOpt.Ordered = false
}
seriesIterator, err := e.createSeriesIterator(measurement, ref, indexSet, seriesOpt)
if err != nil {
return nil, err
}
if seriesIterator != nil {
callIterator, err := query.NewCallIterator(seriesIterator, opt)
if err != nil {
seriesIterator.Close()
return nil, err
}
return []query.Iterator{callIterator}, nil
}
}

// Determine tagsets for this measurement based on dimensions and filters.
var (
tagSets []*query.TagSet
Expand Down
75 changes: 75 additions & 0 deletions tsdb/engine/tsm1/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
"github.com/influxdata/influxdb/v2/tsdb/index/tsi1"
"github.com/influxdata/influxql"
tassert "github.com/stretchr/testify/assert"
"go.uber.org/zap/zaptest"
)

Expand Down Expand Up @@ -2023,6 +2024,80 @@ func TestEngine_CreateCursor_Descending(t *testing.T) {
}
}

// Ensure engine can create an descending iterator for cached values.
func TestEngine_CreateIterator_SeriesKey(t *testing.T) {
t.Parallel()

for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
assert := tassert.New(t)
e := MustOpenEngine(t, index)
defer e.Close()

e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.CreateSeriesIfNotExists([]byte("cpu,host=A,region=east"), []byte("cpu"), models.NewTags(map[string]string{"host": "A", "region": "east"}))
e.CreateSeriesIfNotExists([]byte("cpu,host=B,region=east"), []byte("cpu"), models.NewTags(map[string]string{"host": "B", "region": "east"}))
e.CreateSeriesIfNotExists([]byte("cpu,host=C,region=east"), []byte("cpu"), models.NewTags(map[string]string{"host": "C", "region": "east"}))
e.CreateSeriesIfNotExists([]byte("cpu,host=A,region=west"), []byte("cpu"), models.NewTags(map[string]string{"host": "A", "region": "west"}))

if err := e.WritePointsString(
`cpu,host=A,region=east value=1.1 1000000001`,
`cpu,host=B,region=east value=1.2 1000000002`,
`cpu,host=A,region=east value=1.3 1000000003`,
`cpu,host=C,region=east value=1.4 1000000004`,
`cpu,host=A,region=west value=1.5 1000000005`,
); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}

opts := query.IteratorOptions{
Expr: influxql.MustParseExpr(`_seriesKey`),
Dimensions: []string{},
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,
Condition: influxql.MustParseExpr(`host = 'A'`),
}

itr, err := e.CreateIterator(context.Background(), "cpu", opts)
if err != nil {
t.Fatal(err)
}

stringItr, ok := itr.(query.StringIterator)
assert.True(ok, "series iterator must be of type string")
expectedSeries := map[string]struct{}{
"cpu,host=A,region=west": struct{}{},
"cpu,host=A,region=east": struct{}{},
}
var str *query.StringPoint
for str, err = stringItr.Next(); err == nil && str != (*query.StringPoint)(nil); str, err = stringItr.Next() {
_, ok := expectedSeries[str.Value]
assert.True(ok, "Saw bad key "+str.Value)
delete(expectedSeries, str.Value)
}
assert.NoError(err)
assert.NoError(itr.Close())

countOpts := opts
countOpts.Expr = influxql.MustParseExpr(`count(_seriesKey)`)
itr, err = e.CreateIterator(context.Background(), "cpu", countOpts)
if err != nil {
t.Fatal(err)
}

integerIter, ok := itr.(query.IntegerIterator)
assert.True(ok, "series count iterator must be of type integer")
i, err := integerIter.Next()
assert.NoError(err)
assert.Equal(int64(2), i.Value, "must count 2 series with host=A")
i, err = integerIter.Next()
assert.NoError(err)
assert.Equal((*query.IntegerPoint)(nil), i, "count iterator has only one output")
assert.NoError(itr.Close())
})
}
}

func makeBlockTypeSlice(n int) []byte {
r := make([]byte, n)
b := tsm1.BlockFloat64
Expand Down
69 changes: 69 additions & 0 deletions tsdb/engine/tsm1/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tsm1
import (
"context"
"fmt"
"sync"

"github.com/influxdata/influxdb/v2/influxql/query"
"github.com/influxdata/influxdb/v2/pkg/metrics"
Expand Down Expand Up @@ -216,3 +217,71 @@ func newInstrumentedIterator(ctx context.Context, itr query.Iterator) query.Iter
panic(fmt.Sprintf("unsupported instrumented iterator type: %T", itr))
}
}

type seriesIterator struct {
cur tsdb.SeriesKeyIterator
point query.StringPoint // reusable buffer

statsLock sync.Mutex
stats query.IteratorStats
statsBuf query.IteratorStats
}

func newSeriesIterator(name string, cur tsdb.SeriesKeyIterator) *seriesIterator {
itr := &seriesIterator{
cur: cur,
point: query.StringPoint{
Name: name,
Tags: query.NewTags(nil),
},
}
itr.stats = itr.statsBuf
return itr
}

// Next returns the next point from the iterator.
func (itr *seriesIterator) Next() (*query.StringPoint, error) {
// Read from the main cursor
b, err := itr.cur.Next()
if err != nil {
itr.copyStats()
return nil, err
}
itr.point.Value = string(b)

// Exit if we have no more points or we are outside our time range.
if b == nil {
itr.copyStats()
return nil, nil
}
// Track points returned.
itr.statsBuf.PointN++
itr.statsBuf.SeriesN++

// Copy buffer to stats periodically.
if itr.statsBuf.PointN%statsBufferCopyIntervalN == 0 {
itr.copyStats()
}

return &itr.point, nil
}

// copyStats copies from the itr stats buffer to the stats under lock.
func (itr *seriesIterator) copyStats() {
itr.statsLock.Lock()
itr.stats = itr.statsBuf
itr.statsLock.Unlock()
}

// Stats returns stats on the points processed.
func (itr *seriesIterator) Stats() query.IteratorStats {
itr.statsLock.Lock()
stats := itr.stats
itr.statsLock.Unlock()
return stats
}

// Close closes the iterator.
func (itr *seriesIterator) Close() error {
return itr.cur.Close()
}
93 changes: 93 additions & 0 deletions tsdb/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,12 @@ type SeriesIDIterator interface {
Close() error
}

// SeriesKeyIterator represents an iterator over a list of SeriesKeys
type SeriesKeyIterator interface {
Next() ([]byte, error)
Close() error
}

// SeriesIDSetIterator represents an iterator that can produce a SeriesIDSet.
type SeriesIDSetIterator interface {
SeriesIDIterator
Expand Down Expand Up @@ -2295,6 +2301,93 @@ func (is IndexSet) measurementSeriesByExprIterator(name []byte, expr influxql.Ex
return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil
}

type measurementSeriesKeyByExprIterator struct {
ids SeriesIDIterator
is IndexSet
auth query.Authorizer
once sync.Once
releaser func()
}

func (itr *measurementSeriesKeyByExprIterator) Next() ([]byte, error) {
if itr == nil {
return nil, nil
}
for {
e, err := itr.ids.Next()
if err != nil {
return nil, err
} else if e.SeriesID == 0 {
return nil, nil
}

seriesKey := itr.is.SeriesFile.SeriesKey(e.SeriesID)
if len(seriesKey) == 0 {
continue
}

name, tags := ParseSeriesKey(seriesKey)

// Check leftover filters. All fields that might be filtered default to zero values
if e.Expr != nil {
if v, ok := e.Expr.(*influxql.BooleanLiteral); ok {
if !v.Val {
continue
}
} else {
values := make(map[string]interface{}, len(tags))
for _, t := range tags {
values[string(t.Key)] = string(t.Value)
}
if !influxql.EvalBool(e.Expr, values) {
continue
}
}
}

if itr.auth != nil && !itr.auth.AuthorizeSeriesRead(itr.is.Database(), name, tags) {
continue
}

out := models.MakeKey(name, tags)
// ensure nil is only returned when we are done (or for errors)
if out == nil {
out = []byte{}
}
return out, nil
}
}

func (itr *measurementSeriesKeyByExprIterator) Close() error {
if itr == nil {
return nil
}
itr.once.Do(itr.releaser)
return itr.ids.Close()
}

// MeasurementSeriesKeyByExprIterator iterates through series, filtered by an expression on the tags.
// Any non-tag expressions will be filtered as if the field had the zero value.
func (is IndexSet) MeasurementSeriesKeyByExprIterator(name []byte, expr influxql.Expr, auth query.Authorizer) (SeriesKeyIterator, error) {
release := is.SeriesFile.Retain()
// Create iterator for all matching series.
ids, err := is.measurementSeriesByExprIterator(name, expr)
if err != nil {
release()
return nil, err
}
if ids == nil {
release()
return nil, nil
}
return &measurementSeriesKeyByExprIterator{
ids: ids,
releaser: release,
auth: auth,
is: is,
}, nil
}

// MeasurementSeriesKeysByExpr returns a list of series keys matching expr.
func (is IndexSet) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error) {
release := is.SeriesFile.Retain()
Expand Down
2 changes: 1 addition & 1 deletion tsdb/series_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func (f *SeriesFile) SeriesCount() uint64 {
return n
}

// SeriesIterator returns an iterator over all the series.
// SeriesIDIterator returns an iterator over all the series.
func (f *SeriesFile) SeriesIDIterator() SeriesIDIterator {
var ids []uint64
for _, p := range f.partitions {
Expand Down