Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrument query path #3182

Merged
merged 15 commits into from
Feb 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 72 additions & 37 deletions src/dbnode/network/server/tchannelthrift/node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,15 @@ type serviceMetrics struct {
writeTaggedBatchRawRPCs tally.Counter
writeTaggedBatchRaw instrument.BatchMethodMetrics
overloadRejected tally.Counter

// the total time to call FetchTagged, both querying the index and reading data results (if requested).
queryTimingFetchTagged index.QueryMetrics
// the total time to read data blocks.
queryTimingDataRead index.QueryMetrics
// the total time to call Aggregate.
queryTimingAggregate index.QueryMetrics
// the total time to call AggregateRaw.
queryTimingAggregateRaw index.QueryMetrics
}

func newServiceMetrics(scope tally.Scope, opts instrument.TimerOptions) serviceMetrics {
Expand All @@ -143,6 +152,11 @@ func newServiceMetrics(scope tally.Scope, opts instrument.TimerOptions) serviceM
writeTaggedBatchRawRPCs: scope.Counter("writeTaggedBatchRaw-rpcs"),
writeTaggedBatchRaw: instrument.NewBatchMethodMetrics(scope, "writeTaggedBatchRaw", opts),
overloadRejected: scope.Counter("overload-rejected"),

queryTimingFetchTagged: index.NewQueryMetrics("fetch_tagged", scope),
queryTimingAggregate: index.NewQueryMetrics("aggregate", scope),
queryTimingAggregateRaw: index.NewQueryMetrics("aggregate_raw", scope),
queryTimingDataRead: index.NewQueryMetrics("data_read", scope),
}
}

Expand Down Expand Up @@ -803,6 +817,7 @@ func (s *service) fetchTaggedIter(ctx context.Context, req *rpc.FetchTaggedReque
s.readRPCCompleted()
}))

startTime := s.nowFn()
ns, query, opts, fetchData, err := convert.FromRPCFetchTaggedRequest(req, s.pools)
if err != nil {
return nil, tterrors.NewBadRequestError(err)
Expand All @@ -826,6 +841,11 @@ func (s *service) fetchTaggedIter(ctx context.Context, req *rpc.FetchTaggedReque
tagEncoder: tagEncoder,
iOpts: s.opts.InstrumentOptions(),
instrumentClose: instrumentClose,
totalDocsCount: queryResult.Results.TotalDocsCount(),
nowFn: s.nowFn,
fetchStart: startTime,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this supposed to include the index lookup? Looks like it does (line 835)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. it's total time of the fetch api call

dataReadMetrics: s.metrics.queryTimingDataRead,
totalMetrics: s.metrics.queryTimingFetchTagged,
}), nil
}

Expand Down Expand Up @@ -859,22 +879,13 @@ type FetchTaggedResultsIter interface {
}

type fetchTaggedResultsIter struct {
queryResults *index.ResultsMap
idResults []idResult
startInclusive time.Time
endExclusive time.Time
db storage.Database
idx int
blockReadIdx int
exhaustive bool
fetchData bool
cur IDResult
err error
nsID ident.ID
docReader *docs.EncodedDocumentReader
tagEncoder serialize.TagEncoder
iOpts instrument.Options
instrumentClose func(error)
fetchTaggedResultsIterOpts
idResults []idResult
dataReadStart time.Time
idx int
blockReadIdx int
cur IDResult
err error
}

type fetchTaggedResultsIterOpts struct {
Expand All @@ -887,33 +898,27 @@ type fetchTaggedResultsIterOpts struct {
tagEncoder serialize.TagEncoder
iOpts instrument.Options
instrumentClose func(error)
nowFn clock.NowFn
fetchStart time.Time
totalDocsCount int
dataReadMetrics index.QueryMetrics
totalMetrics index.QueryMetrics
}

func newFetchTaggedResultsIter(opts fetchTaggedResultsIterOpts) FetchTaggedResultsIter { //nolint: gocritic
iter := &fetchTaggedResultsIter{
queryResults: opts.queryResult.Results.Map(),
idResults: make([]idResult, 0, opts.queryResult.Results.Map().Len()),
exhaustive: opts.queryResult.Exhaustive,
db: opts.db,
fetchData: opts.fetchData,
startInclusive: opts.queryOpts.StartInclusive,
endExclusive: opts.queryOpts.EndExclusive,
nsID: opts.nsID,
docReader: opts.docReader,
tagEncoder: opts.tagEncoder,
iOpts: opts.iOpts,
instrumentClose: opts.instrumentClose,
return &fetchTaggedResultsIter{
fetchTaggedResultsIterOpts: opts,
idResults: make([]idResult, 0, opts.queryResult.Results.Map().Len()),
dataReadStart: opts.nowFn(),
}

return iter
}

func (i *fetchTaggedResultsIter) NumIDs() int {
return i.queryResults.Len()
return i.queryResult.Results.Map().Len()
}

func (i *fetchTaggedResultsIter) Exhaustive() bool {
return i.exhaustive
return i.queryResult.Exhaustive
}

func (i *fetchTaggedResultsIter) Namespace() ident.ID {
Expand All @@ -923,7 +928,7 @@ func (i *fetchTaggedResultsIter) Namespace() ident.ID {
func (i *fetchTaggedResultsIter) Next(ctx context.Context) bool {
// initialize the iterator state on the first fetch.
if i.idx == 0 {
for _, entry := range i.queryResults.Iter() { // nolint: gocritic
for _, entry := range i.queryResult.Results.Map().Iter() { // nolint: gocritic
result := idResult{
queryResult: entry,
docReader: i.docReader,
Expand All @@ -935,7 +940,8 @@ func (i *fetchTaggedResultsIter) Next(ctx context.Context) bool {
// copied by the blockRetriever in the streamRequest method when
// it checks if the ID is finalizeable or not with IsNoFinalize.
id := ident.BytesID(result.queryResult.Key())
result.blockReadersIter, i.err = i.db.ReadEncoded(ctx, i.nsID, id, i.startInclusive, i.endExclusive)
result.blockReadersIter, i.err = i.db.ReadEncoded(ctx, i.nsID, id, i.queryOpts.StartInclusive,
i.queryOpts.EndExclusive)
if i.err != nil {
return false
}
Expand All @@ -944,14 +950,14 @@ func (i *fetchTaggedResultsIter) Next(ctx context.Context) bool {
}
}

if i.idx == i.queryResults.Len() {
if i.idx == i.queryResult.Results.Map().Len() {
return false
}

if i.fetchData {
// ensure the blockReaders exist for the current series ID. additionally try to prefetch additional blockReaders
// for future seriesID to pipeline the disk reads.
for i.blockReadIdx < i.queryResults.Len() {
for i.blockReadIdx < i.queryResult.Results.Map().Len() {
var blockReaders [][]xio.BlockReader
blockIter := i.idResults[i.blockReadIdx].blockReadersIter

Expand Down Expand Up @@ -983,6 +989,15 @@ func (i *fetchTaggedResultsIter) Current() IDResult {

func (i *fetchTaggedResultsIter) Close(err error) {
i.instrumentClose(err)
queryRange := i.queryOpts.EndExclusive.Sub(i.queryOpts.StartInclusive)
now := i.nowFn()
dataReadTime := now.Sub(i.dataReadStart)
i.dataReadMetrics.ByRange.Record(queryRange, dataReadTime)
i.dataReadMetrics.ByDocs.Record(i.totalDocsCount, dataReadTime)

totalFetchTime := now.Sub(i.fetchStart)
i.totalMetrics.ByRange.Record(queryRange, totalFetchTime)
i.totalMetrics.ByDocs.Record(i.totalDocsCount, totalFetchTime)
}

// IDResult is the FetchTagged result for a series ID.
Expand Down Expand Up @@ -1067,21 +1082,31 @@ func (s *service) Aggregate(tctx thrift.Context, req *rpc.AggregateQueryRequest)
Exhaustive: queryResult.Exhaustive,
}
results := queryResult.Results
size := 0
for _, entry := range results.Map().Iter() {
size++
responseElem := &rpc.AggregateQueryResultTagNameElement{
TagName: entry.Key().String(),
}
tagValues := entry.Value()
tagValuesMap := tagValues.Map()
responseElem.TagValues = make([]*rpc.AggregateQueryResultTagValueElement, 0, tagValuesMap.Len())
for _, entry := range tagValuesMap.Iter() {
size++
responseElem.TagValues = append(responseElem.TagValues, &rpc.AggregateQueryResultTagValueElement{
TagValue: entry.Key().String(),
})
}
response.Results = append(response.Results, responseElem)
}
s.metrics.aggregate.ReportSuccess(s.nowFn().Sub(callStart))

duration := s.nowFn().Sub(callStart)
queryTiming := s.metrics.queryTimingAggregate
rng := time.Duration(req.RangeEnd - req.RangeStart)
queryTiming.ByRange.Record(rng, duration)
queryTiming.ByDocs.Record(size, duration)

return response, nil
}

Expand Down Expand Up @@ -1111,7 +1136,9 @@ func (s *service) AggregateRaw(tctx thrift.Context, req *rpc.AggregateQueryRawRe
Exhaustive: queryResult.Exhaustive,
}
results := queryResult.Results
size := 0
for _, entry := range results.Map().Iter() {
size++
responseElem := &rpc.AggregateQueryRawResultTagNameElement{
TagName: entry.Key().Bytes(),
}
Expand All @@ -1120,13 +1147,21 @@ func (s *service) AggregateRaw(tctx thrift.Context, req *rpc.AggregateQueryRawRe
tagValuesMap := tagValues.Map()
responseElem.TagValues = make([]*rpc.AggregateQueryRawResultTagValueElement, 0, tagValuesMap.Len())
for _, entry := range tagValuesMap.Iter() {
size++
responseElem.TagValues = append(responseElem.TagValues, &rpc.AggregateQueryRawResultTagValueElement{
TagValue: entry.Key().Bytes(),
})
}
}
response.Results = append(response.Results, responseElem)
}

duration := s.nowFn().Sub(callStart)
queryTiming := s.metrics.queryTimingAggregateRaw
rng := time.Duration(req.RangeEnd - req.RangeStart)
queryTiming.ByRange.Record(rng, duration)
queryTiming.ByDocs.Record(size, duration)

s.metrics.aggregate.ReportSuccess(s.nowFn().Sub(callStart))
return response, nil
}
Expand Down
41 changes: 41 additions & 0 deletions src/dbnode/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1565,6 +1565,10 @@ func (i *nsIndex) queryWithSpan(
cancellable := xresource.NewCancellableLifetime()
defer cancellable.Cancel()

indexMatchingStartTime := time.Now()
var totalWaitTime time.Duration
var waitTimeLock sync.Mutex

for _, block := range blocks {
// Capture block for async query execution below.
block := block
Expand All @@ -1590,7 +1594,11 @@ func (i *nsIndex) queryWithSpan(
if applyTimeout := timeout > 0; !applyTimeout {
// No timeout, just wait blockingly for a worker.
wg.Add(1)
enqueueTime := time.Now()
i.queryWorkersPool.Go(func() {
waitTimeLock.Lock()
totalWaitTime += time.Since(enqueueTime)
waitTimeLock.Unlock()
execBlockFn(ctx, cancellable, block, query, opts, &state, results, logFields)
wg.Done()
})
Expand All @@ -1601,7 +1609,11 @@ func (i *nsIndex) queryWithSpan(
var timedOut bool
if timeLeft := deadline.Sub(i.nowFn()); timeLeft > 0 {
wg.Add(1)
enqueueTime := time.Now()
timedOut := !i.queryWorkersPool.GoWithTimeout(func() {
waitTimeLock.Lock()
totalWaitTime += time.Since(enqueueTime)
waitTimeLock.Unlock()
execBlockFn(ctx, cancellable, block, query, opts, &state, results, logFields)
wg.Done()
}, timeLeft)
Expand Down Expand Up @@ -1665,6 +1677,19 @@ func (i *nsIndex) queryWithSpan(
if err != nil {
return false, err
}

queryRuntime := time.Since(indexMatchingStartTime)
queryRange := opts.EndExclusive.Sub(opts.StartInclusive)

i.metrics.queryTotalTime.ByRange.Record(queryRange, queryRuntime)
i.metrics.queryTotalTime.ByDocs.Record(results.TotalDocsCount(), queryRuntime)
i.metrics.queryWaitTime.ByRange.Record(queryRange, totalWaitTime)
i.metrics.queryWaitTime.ByDocs.Record(results.TotalDocsCount(), totalWaitTime)
i.metrics.queryProcessingTime.ByRange.Record(queryRange, results.TotalDuration().Processing)
i.metrics.queryProcessingTime.ByDocs.Record(results.TotalDocsCount(), results.TotalDuration().Processing)
i.metrics.querySearchTime.ByRange.Record(queryRange, results.TotalDuration().Search)
i.metrics.querySearchTime.ByDocs.Record(results.TotalDocsCount(), results.TotalDuration().Search)

return exhaustive, nil
}

Expand Down Expand Up @@ -2211,6 +2236,18 @@ type nsIndexMetrics struct {
queryNonExhaustiveLimitError tally.Counter
queryNonExhaustiveSeriesLimitError tally.Counter
queryNonExhaustiveDocsLimitError tally.Counter

// the total time for a query, including waiting for processing resources.
queryTotalTime index.QueryMetrics
// the time spent waiting for workers to start processing the query. totalTime - waitTime = processing time. this is
// not necessary equal to processingTime below since a query can use multiple workers at once.
queryWaitTime index.QueryMetrics
// the total time a query was consuming processing resources. this can actually be greater than queryTotalTime
// if the query uses multiple CPUs.
queryProcessingTime index.QueryMetrics
// the total time a query was searching for documents. queryProcessingTime - querySearchTime == time processing
// search results.
querySearchTime index.QueryMetrics
}

func newNamespaceIndexMetrics(
Expand Down Expand Up @@ -2306,6 +2343,10 @@ func newNamespaceIndexMetrics(
"exhaustive": "false",
"result": "error_docs_require_exhaustive",
}).Counter("query"),
queryTotalTime: index.NewQueryMetrics("query_total", scope),
queryWaitTime: index.NewQueryMetrics("query_wait", scope),
queryProcessingTime: index.NewQueryMetrics("query_processing", scope),
querySearchTime: index.NewQueryMetrics("query_search", scope),
}

// Initialize gauges that should default to zero before
Expand Down
22 changes: 22 additions & 0 deletions src/dbnode/storage/index/aggregate_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package index
import (
"fmt"
"sync"
"time"

"github.com/m3db/m3/src/m3ninx/doc"
"github.com/m3db/m3/src/m3ninx/index/segment/fst/encoding/docs"
Expand Down Expand Up @@ -52,6 +53,7 @@ type aggregatedResults struct {
valuesPool AggregateValuesPool

encodedDocReader docs.EncodedDocumentReader
resultDuration ResultDurations
}

// NewAggregateResults returns a new AggregateResults object.
Expand All @@ -71,6 +73,24 @@ func NewAggregateResults(
}
}

func (r *aggregatedResults) TotalDuration() ResultDurations {
r.RLock()
defer r.RUnlock()
return r.resultDuration
}

func (r *aggregatedResults) AddBlockProcessingDuration(duration time.Duration) {
r.Lock()
defer r.Unlock()
r.resultDuration = r.resultDuration.AddProcessing(duration)
}

func (r *aggregatedResults) AddBlockSearchDuration(duration time.Duration) {
r.Lock()
defer r.Unlock()
r.resultDuration = r.resultDuration.AddSearch(duration)
}

func (r *aggregatedResults) EnforceLimits() bool { return true }

func (r *aggregatedResults) Reset(
Expand Down Expand Up @@ -102,6 +122,8 @@ func (r *aggregatedResults) Reset(
r.resultsMap.Reset()
r.totalDocsCount = 0

r.resultDuration = ResultDurations{}

// NB: could do keys+value in one step but I'm trying to avoid
// using an internal method of a code-gen'd type.
r.Unlock()
Expand Down
4 changes: 4 additions & 0 deletions src/dbnode/storage/index/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,10 +419,12 @@ func (b *block) Query(
sp.LogFields(logFields...)
defer sp.Finish()

start := time.Now()
exhaustive, err := b.queryWithSpan(ctx, cancellable, query, opts, results, sp, logFields)
if err != nil {
sp.LogFields(opentracinglog.Error(err))
}
results.AddBlockProcessingDuration(time.Since(start))

return exhaustive, err
}
Expand Down Expand Up @@ -529,6 +531,8 @@ func (b *block) queryWithSpan(
return false, err
}

results.AddBlockSearchDuration(iter.SearchDuration())

return opts.exhaustive(size, docsCount), nil
}

Expand Down
Loading