Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrument query path #3182

Merged
merged 15 commits into from
Feb 7, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions src/dbnode/network/server/tchannelthrift/node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,24 @@ type serviceMetrics struct {
writeTaggedBatchRawRPCs tally.Counter
writeTaggedBatchRaw instrument.BatchMethodMetrics
overloadRejected tally.Counter

queryTimingFetchTagged queryMetrics
queryTimingAggregate queryMetrics
queryTimingAggregateRaw queryMetrics

queryTimingDataRead queryMetrics
}

func newQueryMetrics(name string, scope tally.Scope) queryMetrics {
return queryMetrics{
byRange: limits.NewQueryRangeMetrics(name, scope),
byDocs: limits.NewCardinalityMetrics(name, scope),
}
}

type queryMetrics struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move this into the limits package? Seems like we typically want the combo of by range / by cardinality together

Copy link
Collaborator Author

@ryanhall07 ryanhall07 Feb 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually i think it's strange they are in the limits package. moved them to the index package. good call on reusing though.

byRange limits.QueryDurationMetrics
byDocs limits.QueryCardinalityMetrics
}

func newServiceMetrics(scope tally.Scope, opts instrument.TimerOptions) serviceMetrics {
Expand All @@ -143,6 +161,11 @@ func newServiceMetrics(scope tally.Scope, opts instrument.TimerOptions) serviceM
writeTaggedBatchRawRPCs: scope.Counter("writeTaggedBatchRaw-rpcs"),
writeTaggedBatchRaw: instrument.NewBatchMethodMetrics(scope, "writeTaggedBatchRaw", opts),
overloadRejected: scope.Counter("overload-rejected"),

queryTimingFetchTagged: newQueryMetrics("fetch_tagged", scope),
queryTimingAggregate: newQueryMetrics("aggregate", scope),
queryTimingAggregateRaw: newQueryMetrics("aggregate_raw", scope),
queryTimingDataRead: newQueryMetrics("data_read", scope),
}
}

Expand Down Expand Up @@ -803,6 +826,7 @@ func (s *service) fetchTaggedIter(ctx context.Context, req *rpc.FetchTaggedReque
s.readRPCCompleted()
}))

startTime := s.nowFn()
ns, query, opts, fetchData, err := convert.FromRPCFetchTaggedRequest(req, s.pools)
if err != nil {
return nil, tterrors.NewBadRequestError(err)
Expand All @@ -826,6 +850,11 @@ func (s *service) fetchTaggedIter(ctx context.Context, req *rpc.FetchTaggedReque
tagEncoder: tagEncoder,
iOpts: s.opts.InstrumentOptions(),
instrumentClose: instrumentClose,
totalDocsCount: queryResult.Results.TotalDocsCount(),
nowFn: s.nowFn,
fetchStart: startTime,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this supposed to include the index lookup? Looks like it does (line 835)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. it's total time of the fetch api call

dataReadMetrics: s.metrics.queryTimingDataRead,
totalMetrics: s.metrics.queryTimingFetchTagged,
}), nil
}

Expand Down Expand Up @@ -875,6 +904,12 @@ type fetchTaggedResultsIter struct {
tagEncoder serialize.TagEncoder
iOpts instrument.Options
instrumentClose func(error)
startTime time.Time
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is this set?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also what's the difference between this and fetchStart?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ugh it's not. clearly a test is needed for the metrics.

renamed this to dataReadStart to make it more clear.

nowFn clock.NowFn
fetchStart time.Time
totalDocsCount int
dataReadMetrics queryMetrics
totalMetrics queryMetrics
}

type fetchTaggedResultsIterOpts struct {
Expand All @@ -887,6 +922,11 @@ type fetchTaggedResultsIterOpts struct {
tagEncoder serialize.TagEncoder
iOpts instrument.Options
instrumentClose func(error)
nowFn clock.NowFn
fetchStart time.Time
totalDocsCount int
dataReadMetrics queryMetrics
totalMetrics queryMetrics
}

func newFetchTaggedResultsIter(opts fetchTaggedResultsIterOpts) FetchTaggedResultsIter { //nolint: gocritic
Expand All @@ -903,6 +943,11 @@ func newFetchTaggedResultsIter(opts fetchTaggedResultsIterOpts) FetchTaggedResul
tagEncoder: opts.tagEncoder,
iOpts: opts.iOpts,
instrumentClose: opts.instrumentClose,
totalDocsCount: opts.totalDocsCount,
nowFn: opts.nowFn,
fetchStart: opts.fetchStart,
dataReadMetrics: opts.dataReadMetrics,
totalMetrics: opts.totalMetrics,
}

return iter
Expand Down Expand Up @@ -983,6 +1028,14 @@ func (i *fetchTaggedResultsIter) Current() IDResult {

func (i *fetchTaggedResultsIter) Close(err error) {
i.instrumentClose(err)
now := i.nowFn()
elapsed := now.Sub(i.startTime)
i.dataReadMetrics.byRange.Record(i.endExclusive.Sub(i.startInclusive), elapsed)
i.dataReadMetrics.byDocs.Record(i.totalDocsCount, elapsed)

totalElapsed := now.Sub(i.fetchStart)
i.totalMetrics.byRange.Record(i.endExclusive.Sub(i.startInclusive), totalElapsed)
i.totalMetrics.byDocs.Record(i.totalDocsCount, totalElapsed)
}

// IDResult is the FetchTagged result for a series ID.
Expand Down Expand Up @@ -1067,21 +1120,31 @@ func (s *service) Aggregate(tctx thrift.Context, req *rpc.AggregateQueryRequest)
Exhaustive: queryResult.Exhaustive,
}
results := queryResult.Results
size := 0
for _, entry := range results.Map().Iter() {
size++
responseElem := &rpc.AggregateQueryResultTagNameElement{
TagName: entry.Key().String(),
}
tagValues := entry.Value()
tagValuesMap := tagValues.Map()
responseElem.TagValues = make([]*rpc.AggregateQueryResultTagValueElement, 0, tagValuesMap.Len())
for _, entry := range tagValuesMap.Iter() {
size++
responseElem.TagValues = append(responseElem.TagValues, &rpc.AggregateQueryResultTagValueElement{
TagValue: entry.Key().String(),
})
}
response.Results = append(response.Results, responseElem)
}
s.metrics.aggregate.ReportSuccess(s.nowFn().Sub(callStart))

duration := s.nowFn().Sub(callStart)
queryTiming := s.metrics.queryTimingAggregate
rng := time.Duration(req.RangeEnd - req.RangeStart)
queryTiming.byRange.Record(rng, duration)
queryTiming.byDocs.Record(size, duration)

return response, nil
}

Expand Down Expand Up @@ -1111,7 +1174,9 @@ func (s *service) AggregateRaw(tctx thrift.Context, req *rpc.AggregateQueryRawRe
Exhaustive: queryResult.Exhaustive,
}
results := queryResult.Results
size := 0
for _, entry := range results.Map().Iter() {
size++
responseElem := &rpc.AggregateQueryRawResultTagNameElement{
TagName: entry.Key().Bytes(),
}
Expand All @@ -1120,13 +1185,21 @@ func (s *service) AggregateRaw(tctx thrift.Context, req *rpc.AggregateQueryRawRe
tagValuesMap := tagValues.Map()
responseElem.TagValues = make([]*rpc.AggregateQueryRawResultTagValueElement, 0, tagValuesMap.Len())
for _, entry := range tagValuesMap.Iter() {
size++
responseElem.TagValues = append(responseElem.TagValues, &rpc.AggregateQueryRawResultTagValueElement{
TagValue: entry.Key().Bytes(),
})
}
}
response.Results = append(response.Results, responseElem)
}

duration := s.nowFn().Sub(callStart)
queryTiming := s.metrics.queryTimingAggregateRaw
rng := time.Duration(req.RangeEnd - req.RangeStart)
queryTiming.byRange.Record(rng, duration)
queryTiming.byDocs.Record(size, duration)

s.metrics.aggregate.ReportSuccess(s.nowFn().Sub(callStart))
return response, nil
}
Expand Down
39 changes: 39 additions & 0 deletions src/dbnode/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/dbnode/storage/index/compaction"
"github.com/m3db/m3/src/dbnode/storage/index/convert"
"github.com/m3db/m3/src/dbnode/storage/limits"
"github.com/m3db/m3/src/dbnode/storage/series"
"github.com/m3db/m3/src/dbnode/tracepoint"
"github.com/m3db/m3/src/dbnode/ts/writes"
Expand Down Expand Up @@ -1565,6 +1566,8 @@ func (i *nsIndex) queryWithSpan(
cancellable := xresource.NewCancellableLifetime()
defer cancellable.Cancel()

indexMatchingStartTime := time.Now()

for _, block := range blocks {
// Capture block for async query execution below.
block := block
Expand Down Expand Up @@ -1665,6 +1668,17 @@ func (i *nsIndex) queryWithSpan(
if err != nil {
return false, err
}

queryRuntime := time.Since(indexMatchingStartTime)
queryRange := opts.EndExclusive.Sub(opts.StartInclusive)

i.metrics.queryTotalTimeByRange.Record(queryRange, queryRuntime)
i.metrics.queryTotalTimeByCardinality.Record(results.TotalDocsCount(), queryRuntime)
i.metrics.querySearchByRange.Record(queryRange, results.TotalDuration().Search)
i.metrics.querySearchByCardinality.Record(results.TotalDocsCount(), results.TotalDuration().Search)
i.metrics.queryProcessingTimeByRange.Record(queryRange, results.TotalDuration().Total)
i.metrics.queryProcessingTimeByCardinality.Record(results.TotalDocsCount(), results.TotalDuration().Total)

return exhaustive, nil
}

Expand Down Expand Up @@ -2211,6 +2225,25 @@ type nsIndexMetrics struct {
queryNonExhaustiveLimitError tally.Counter
queryNonExhaustiveSeriesLimitError tally.Counter
queryNonExhaustiveDocsLimitError tally.Counter

// query*Time metrics by both range and cardinality. byRange buckets by the query time window ([start,end]).
// byCardinality buckets by the # of documents returned by the query. byRange allows us to understand the impact
// of queries that look at many index blocks. byCardinality allows us to understand the impact of queries that
// return many documents from index blocks.

// the total time for a query, including waiting for processing resources.
queryTotalTimeByRange limits.QueryDurationMetrics
queryTotalTimeByCardinality limits.QueryCardinalityMetrics

// the total time a query was consuming processing resources. queryTotalTime - queryProcessing == time waiting for
// resources.
queryProcessingTimeByRange limits.QueryDurationMetrics
queryProcessingTimeByCardinality limits.QueryCardinalityMetrics

// the total time a query was searching for documents. queryProcessing - querySearch == time processing search
// results.
querySearchByRange limits.QueryDurationMetrics
querySearchByCardinality limits.QueryCardinalityMetrics
}

func newNamespaceIndexMetrics(
Expand Down Expand Up @@ -2306,6 +2339,12 @@ func newNamespaceIndexMetrics(
"exhaustive": "false",
"result": "error_docs_require_exhaustive",
}).Counter("query"),
queryTotalTimeByRange: limits.NewQueryRangeMetrics("query_total", scope),
queryTotalTimeByCardinality: limits.NewCardinalityMetrics("query_total", scope),
queryProcessingTimeByRange: limits.NewQueryRangeMetrics("query_processing", scope),
queryProcessingTimeByCardinality: limits.NewCardinalityMetrics("query_processing", scope),
querySearchByRange: limits.NewQueryRangeMetrics("query_search", scope),
querySearchByCardinality: limits.NewCardinalityMetrics("query_search", scope),
}

// Initialize gauges that should default to zero before
Expand Down
20 changes: 20 additions & 0 deletions src/dbnode/storage/index/aggregate_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package index
import (
"fmt"
"sync"
"time"

"github.com/m3db/m3/src/m3ninx/doc"
"github.com/m3db/m3/src/m3ninx/index/segment/fst/encoding/docs"
Expand Down Expand Up @@ -52,6 +53,7 @@ type aggregatedResults struct {
valuesPool AggregateValuesPool

encodedDocReader docs.EncodedDocumentReader
resultDuration ResultDurations
}

// NewAggregateResults returns a new AggregateResults object.
Expand All @@ -71,6 +73,24 @@ func NewAggregateResults(
}
}

func (r *aggregatedResults) TotalDuration() ResultDurations {
r.RLock()
defer r.RUnlock()
return r.resultDuration
}

func (r *aggregatedResults) AddBlockTotalDuration(duration time.Duration) {
r.Lock()
defer r.Unlock()
r.resultDuration = r.resultDuration.AddTotal(duration)
}

func (r *aggregatedResults) AddBlockSearchDuration(duration time.Duration) {
r.Lock()
defer r.Unlock()
r.resultDuration = r.resultDuration.AddSearch(duration)
}

func (r *aggregatedResults) EnforceLimits() bool { return true }

func (r *aggregatedResults) Reset(
Expand Down
4 changes: 4 additions & 0 deletions src/dbnode/storage/index/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,10 +419,12 @@ func (b *block) Query(
sp.LogFields(logFields...)
defer sp.Finish()

start := time.Now()
exhaustive, err := b.queryWithSpan(ctx, cancellable, query, opts, results, sp, logFields)
if err != nil {
sp.LogFields(opentracinglog.Error(err))
}
results.AddBlockTotalDuration(time.Since(start))

return exhaustive, err
}
Expand Down Expand Up @@ -529,6 +531,8 @@ func (b *block) queryWithSpan(
return false, err
}

results.AddBlockSearchDuration(iter.SearchDuration())

return opts.exhaustive(size, docsCount), nil
}

Expand Down
Loading