Skip to content

Commit

Permalink
[coordinator] Only fanout to resolved cluster namespaces for metadata…
Browse files Browse the repository at this point in the history
…/aggregate queries (#3234)
  • Loading branch information
robskillington authored Feb 17, 2021
1 parent 1778b9f commit 30e1c10
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 95 deletions.
8 changes: 5 additions & 3 deletions src/query/api/v1/handler/graphite/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions"
"github.com/m3db/m3/src/query/api/v1/options"
"github.com/m3db/m3/src/query/graphite/graphite"
"github.com/m3db/m3/src/query/storage"
graphitestorage "github.com/m3db/m3/src/query/graphite/storage"
"github.com/m3db/m3/src/query/storage/m3/consolidators"
"github.com/m3db/m3/src/query/util/logging"
xerrors "github.com/m3db/m3/src/x/errors"
Expand All @@ -50,15 +50,17 @@ var (
)

type grahiteFindHandler struct {
storage storage.Storage
storage graphitestorage.Storage
fetchOptionsBuilder handleroptions.FetchOptionsBuilder
instrumentOpts instrument.Options
}

// NewFindHandler returns a new instance of handler.
func NewFindHandler(opts options.HandlerOptions) http.Handler {
wrappedStore := graphitestorage.NewM3WrappedStorage(opts.Storage(),
opts.M3DBOptions(), opts.InstrumentOpts(), opts.GraphiteStorageOptions())
return &grahiteFindHandler{
storage: opts.Storage(),
storage: wrappedStore,
fetchOptionsBuilder: opts.FetchOptionsBuilder(),
instrumentOpts: opts.InstrumentOpts(),
}
Expand Down
12 changes: 12 additions & 0 deletions src/query/graphite/common/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package common

import (
stdcontext "context"
"fmt"
"math"
"testing"
Expand All @@ -31,6 +32,8 @@ import (
"github.com/m3db/m3/src/query/graphite/storage"
xtest "github.com/m3db/m3/src/query/graphite/testing"
"github.com/m3db/m3/src/query/graphite/ts"
querystorage "github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/storage/m3/consolidators"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -162,3 +165,12 @@ func (s *MovingFunctionStorage) fetchByIDs(

return storage.NewFetchResult(ctx, seriesList, block.NewResultMetadata()), nil
}

// CompleteTags implements the storage interface.
func (s *MovingFunctionStorage) CompleteTags(
ctx stdcontext.Context,
query *querystorage.CompleteTagsQuery,
opts *querystorage.FetchOptions,
) (*consolidators.CompleteTagsResult, error) {
return nil, fmt.Errorf("not implemented")
}
14 changes: 13 additions & 1 deletion src/query/graphite/native/builtin_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package native

import (
"context"
"fmt"
"math"
"math/rand"
Expand All @@ -33,6 +34,8 @@ import (
"github.com/m3db/m3/src/query/graphite/storage"
xtest "github.com/m3db/m3/src/query/graphite/testing"
"github.com/m3db/m3/src/query/graphite/ts"
querystorage "github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/storage/m3/consolidators"
xgomock "github.com/m3db/m3/src/x/test"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -2558,10 +2561,19 @@ func TestSubstr(t *testing.T) {
type mockStorage struct{}

func (*mockStorage) FetchByQuery(
ctx xctx.Context, query string, opts storage.FetchOptions,
ctx xctx.Context,
query string,
opts storage.FetchOptions,
) (*storage.FetchResult, error) {
return storage.NewFetchResult(ctx, nil, block.NewResultMetadata()), nil
}
func (*mockStorage) CompleteTags(
ctx context.Context,
query *querystorage.CompleteTagsQuery,
opts *querystorage.FetchOptions,
) (*consolidators.CompleteTagsResult, error) {
return nil, fmt.Errorf("not implemented")
}

func TestHoltWintersForecast(t *testing.T) {
ctx := common.NewTestContext()
Expand Down
51 changes: 37 additions & 14 deletions src/query/graphite/storage/m3_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
"github.com/m3db/m3/src/query/graphite/ts"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/storage"
querystorage "github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/storage/m3/consolidators"
"github.com/m3db/m3/src/query/ts/m3db"
"github.com/m3db/m3/src/query/util/logging"
"github.com/m3db/m3/src/x/instrument"
Expand Down Expand Up @@ -396,6 +398,21 @@ func translateTimeseriesFromIter(
return series, nil
}

func (s *m3WrappedStore) fanoutOptions() *storage.FanoutOptions {
fanoutOpts := &storage.FanoutOptions{
FanoutUnaggregated: storage.FanoutForceDisable,
FanoutAggregated: storage.FanoutDefault,
FanoutAggregatedOptimized: storage.FanoutForceDisable,
}
if s.opts.AggregateNamespacesAllData {
// NB(r): If aggregate namespaces house all the data, we can do a
// default optimized fanout where we only query the namespaces
// that contain the data for the ranges we are querying for.
fanoutOpts.FanoutAggregatedOptimized = storage.FanoutDefault
}
return fanoutOpts
}

func (s *m3WrappedStore) FetchByQuery(
ctx xctx.Context, query string, fetchOpts FetchOptions,
) (*FetchResult, error) {
Expand All @@ -412,26 +429,20 @@ func (s *m3WrappedStore) FetchByQuery(
}, nil
}

m3ctx, cancel := context.WithTimeout(ctx.RequestContext(), fetchOpts.Timeout)
defer cancel()
m3ctx := ctx.RequestContext()
if _, ok := m3ctx.Deadline(); !ok {
var cancel context.CancelFunc
m3ctx, cancel = context.WithTimeout(m3ctx, fetchOpts.Timeout)
defer cancel()
}

fetchOptions := storage.NewFetchOptions()
fetchOptions.SeriesLimit = fetchOpts.Limit
fetchOptions.Source = fetchOpts.Source

// NB: ensure single block return.
fetchOptions.BlockType = models.TypeSingleBlock
fetchOptions.FanoutOptions = &storage.FanoutOptions{
FanoutUnaggregated: storage.FanoutForceDisable,
FanoutAggregated: storage.FanoutDefault,
FanoutAggregatedOptimized: storage.FanoutForceDisable,
}
if s.opts.AggregateNamespacesAllData {
// NB(r): If aggregate namespaces house all the data, we can do a
// default optimized fanout where we only query the namespaces
// that contain the data for the ranges we are querying for.
fetchOptions.FanoutOptions.FanoutAggregatedOptimized = storage.FanoutDefault
}

fetchOptions.FanoutOptions = s.fanoutOptions()
res, err := s.m3.FetchBlocks(m3ctx, m3query, fetchOptions)
if err != nil {
return nil, err
Expand Down Expand Up @@ -460,3 +471,15 @@ func (s *m3WrappedStore) FetchByQuery(

return NewFetchResult(ctx, series, res.Metadata), nil
}

func (s *m3WrappedStore) CompleteTags(
ctx context.Context,
query *querystorage.CompleteTagsQuery,
opts *querystorage.FetchOptions,
) (*consolidators.CompleteTagsResult, error) {
// NB(r): Make sure to apply consistent fanout options to both
// queries and aggregate queries for Graphite.
opts = opts.Clone() // Clone to avoid mutating input and cause data races.
opts.FanoutOptions = s.fanoutOptions()
return s.m3.CompleteTags(ctx, query, opts)
}
14 changes: 13 additions & 1 deletion src/query/graphite/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
package storage

import (
stdcontext "context"
"sync"
"time"

"github.com/m3db/m3/src/query/block"
"github.com/m3db/m3/src/query/graphite/context"
"github.com/m3db/m3/src/query/graphite/ts"
querystorage "github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/storage/m3/consolidators"
)

// FetchOptions provides context to a fetch expression.
Expand Down Expand Up @@ -55,8 +58,17 @@ type DataOptions struct {
type Storage interface {
// FetchByQuery fetches timeseries data based on a query.
FetchByQuery(
ctx context.Context, query string, opts FetchOptions,
ctx context.Context,
query string,
opts FetchOptions,
) (*FetchResult, error)

// CompleteTags fetches tag data based on a request.
CompleteTags(
ctx stdcontext.Context,
query *querystorage.CompleteTagsQuery,
opts *querystorage.FetchOptions,
) (*consolidators.CompleteTagsResult, error)
}

// FetchResult provides a fetch result and meta information.
Expand Down
24 changes: 21 additions & 3 deletions src/query/graphite/storage/storage_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 34 additions & 12 deletions src/query/storage/m3/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,6 @@ func (s *m3storage) CompleteTags(
aggOpts := storage.FetchOptionsToAggregateOptions(options, query)
var (
nameOnly = query.CompleteNameOnly
namespaces = s.clusters.ClusterNamespaces()
tagOpts = s.opts.TagOptions()
accumulatedTags = consolidators.NewCompleteTagsResultBuilder(nameOnly, tagOpts)
multiErr syncMultiErrs
Expand All @@ -438,8 +437,20 @@ func (s *m3storage) CompleteTags(
)
}

if len(namespaces) == 0 {
return nil, errNoNamespacesConfigured
// NB(r): Since we don't use a single index we fan out to each
// cluster that can completely fulfill this range and then prefer the
// highest resolution (most fine grained) results.
// This needs to be optimized, however this is a start.
_, namespaces, err := resolveClusterNamespacesForQuery(
s.nowFn(),
query.Start,
query.End,
s.clusters,
options.FanoutOptions,
options.RestrictQueryOptions,
)
if err != nil {
return nil, err
}

var mu sync.Mutex
Expand Down Expand Up @@ -545,7 +556,7 @@ func (s *m3storage) SearchCompressed(

select {
case <-ctx.Done():
return tagResult, nil, ctx.Err()
return tagResult, noop, ctx.Err()
default:
}

Expand All @@ -555,11 +566,26 @@ func (s *m3storage) SearchCompressed(
}

var (
namespaces = s.clusters.ClusterNamespaces()
m3opts = storage.FetchOptionsToM3Options(options, query)
result = consolidators.NewMultiFetchTagsResult(s.opts.TagOptions())
wg sync.WaitGroup
m3opts = storage.FetchOptionsToM3Options(options, query)
result = consolidators.NewMultiFetchTagsResult(s.opts.TagOptions())
wg sync.WaitGroup
)

// NB(r): Since we don't use a single index we fan out to each
// cluster that can completely fulfill this range and then prefer the
// highest resolution (most fine grained) results.
// This needs to be optimized, however this is a start.
_, namespaces, err := resolveClusterNamespacesForQuery(
s.nowFn(),
query.Start,
query.End,
s.clusters,
options.FanoutOptions,
options.RestrictQueryOptions,
)
if err != nil {
return tagResult, noop, err
}

debugLog := s.logger.Check(zapcore.DebugLevel,
"searching")
Expand All @@ -572,10 +598,6 @@ func (s *m3storage) SearchCompressed(
)
}

if len(namespaces) == 0 {
return tagResult, noop, errNoNamespacesConfigured
}

wg.Add(len(namespaces))
for _, namespace := range namespaces {
namespace := namespace // Capture var
Expand Down
Loading

0 comments on commit 30e1c10

Please sign in to comment.