Skip to content

Commit

Permalink
reuse composite QueryMetrics in service
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanhall07 committed Feb 6, 2021
1 parent 0fb1e29 commit 07960d6
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 116 deletions.
126 changes: 44 additions & 82 deletions src/dbnode/network/server/tchannelthrift/node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,23 +124,14 @@ type serviceMetrics struct {
writeTaggedBatchRaw instrument.BatchMethodMetrics
overloadRejected tally.Counter

queryTimingFetchTagged queryMetrics
queryTimingAggregate queryMetrics
queryTimingAggregateRaw queryMetrics

queryTimingDataRead queryMetrics
}

func newQueryMetrics(name string, scope tally.Scope) queryMetrics {
return queryMetrics{
byRange: limits.NewQueryRangeMetrics(name, scope),
byDocs: limits.NewCardinalityMetrics(name, scope),
}
}

type queryMetrics struct {
byRange limits.QueryDurationMetrics
byDocs limits.QueryCardinalityMetrics
// the total time to call FetchTagged, both querying the index and reading data results (if requested).
queryTimingFetchTagged index.QueryMetrics
// the total time to read data blocks.
queryTimingDataRead index.QueryMetrics
// the total time to call Aggregate.
queryTimingAggregate index.QueryMetrics
// the total time to call AggregateRaw.
queryTimingAggregateRaw index.QueryMetrics
}

func newServiceMetrics(scope tally.Scope, opts instrument.TimerOptions) serviceMetrics {
Expand All @@ -162,10 +153,10 @@ func newServiceMetrics(scope tally.Scope, opts instrument.TimerOptions) serviceM
writeTaggedBatchRaw: instrument.NewBatchMethodMetrics(scope, "writeTaggedBatchRaw", opts),
overloadRejected: scope.Counter("overload-rejected"),

queryTimingFetchTagged: newQueryMetrics("fetch_tagged", scope),
queryTimingAggregate: newQueryMetrics("aggregate", scope),
queryTimingAggregateRaw: newQueryMetrics("aggregate_raw", scope),
queryTimingDataRead: newQueryMetrics("data_read", scope),
queryTimingFetchTagged: index.NewQueryMetrics("fetch_tagged", scope),
queryTimingAggregate: index.NewQueryMetrics("aggregate", scope),
queryTimingAggregateRaw: index.NewQueryMetrics("aggregate_raw", scope),
queryTimingDataRead: index.NewQueryMetrics("data_read", scope),
}
}

Expand Down Expand Up @@ -888,28 +879,13 @@ type FetchTaggedResultsIter interface {
}

type fetchTaggedResultsIter struct {
queryResults *index.ResultsMap
idResults []idResult
startInclusive time.Time
endExclusive time.Time
db storage.Database
idx int
blockReadIdx int
exhaustive bool
fetchData bool
cur IDResult
err error
nsID ident.ID
docReader *docs.EncodedDocumentReader
tagEncoder serialize.TagEncoder
iOpts instrument.Options
instrumentClose func(error)
startTime time.Time
nowFn clock.NowFn
fetchStart time.Time
totalDocsCount int
dataReadMetrics queryMetrics
totalMetrics queryMetrics
fetchTaggedResultsIterOpts
idResults []idResult
dataReadStart time.Time
idx int
blockReadIdx int
cur IDResult
err error
}

type fetchTaggedResultsIterOpts struct {
Expand All @@ -925,40 +901,24 @@ type fetchTaggedResultsIterOpts struct {
nowFn clock.NowFn
fetchStart time.Time
totalDocsCount int
dataReadMetrics queryMetrics
totalMetrics queryMetrics
dataReadMetrics index.QueryMetrics
totalMetrics index.QueryMetrics
}

func newFetchTaggedResultsIter(opts fetchTaggedResultsIterOpts) FetchTaggedResultsIter { //nolint: gocritic
iter := &fetchTaggedResultsIter{
queryResults: opts.queryResult.Results.Map(),
idResults: make([]idResult, 0, opts.queryResult.Results.Map().Len()),
exhaustive: opts.queryResult.Exhaustive,
db: opts.db,
fetchData: opts.fetchData,
startInclusive: opts.queryOpts.StartInclusive,
endExclusive: opts.queryOpts.EndExclusive,
nsID: opts.nsID,
docReader: opts.docReader,
tagEncoder: opts.tagEncoder,
iOpts: opts.iOpts,
instrumentClose: opts.instrumentClose,
totalDocsCount: opts.totalDocsCount,
nowFn: opts.nowFn,
fetchStart: opts.fetchStart,
dataReadMetrics: opts.dataReadMetrics,
totalMetrics: opts.totalMetrics,
}

return iter
return &fetchTaggedResultsIter{
fetchTaggedResultsIterOpts: opts,
idResults: make([]idResult, 0, opts.queryResult.Results.Map().Len()),
dataReadStart: opts.nowFn(),
}
}

func (i *fetchTaggedResultsIter) NumIDs() int {
return i.queryResults.Len()
return i.queryResult.Results.Map().Len()
}

func (i *fetchTaggedResultsIter) Exhaustive() bool {
return i.exhaustive
return i.queryResult.Exhaustive
}

func (i *fetchTaggedResultsIter) Namespace() ident.ID {
Expand All @@ -968,7 +928,7 @@ func (i *fetchTaggedResultsIter) Namespace() ident.ID {
func (i *fetchTaggedResultsIter) Next(ctx context.Context) bool {
// initialize the iterator state on the first fetch.
if i.idx == 0 {
for _, entry := range i.queryResults.Iter() { // nolint: gocritic
for _, entry := range i.queryResult.Results.Map().Iter() { // nolint: gocritic
result := idResult{
queryResult: entry,
docReader: i.docReader,
Expand All @@ -980,7 +940,8 @@ func (i *fetchTaggedResultsIter) Next(ctx context.Context) bool {
// copied by the blockRetriever in the streamRequest method when
// it checks if the ID is finalizeable or not with IsNoFinalize.
id := ident.BytesID(result.queryResult.Key())
result.blockReadersIter, i.err = i.db.ReadEncoded(ctx, i.nsID, id, i.startInclusive, i.endExclusive)
result.blockReadersIter, i.err = i.db.ReadEncoded(ctx, i.nsID, id, i.queryOpts.StartInclusive,
i.queryOpts.EndExclusive)
if i.err != nil {
return false
}
Expand All @@ -989,14 +950,14 @@ func (i *fetchTaggedResultsIter) Next(ctx context.Context) bool {
}
}

if i.idx == i.queryResults.Len() {
if i.idx == i.queryResult.Results.Map().Len() {
return false
}

if i.fetchData {
// ensure the blockReaders exist for the current series ID. additionally try to prefetch additional blockReaders
// for future seriesID to pipeline the disk reads.
for i.blockReadIdx < i.queryResults.Len() {
for i.blockReadIdx < i.queryResult.Results.Map().Len() {
var blockReaders [][]xio.BlockReader
blockIter := i.idResults[i.blockReadIdx].blockReadersIter

Expand Down Expand Up @@ -1028,14 +989,15 @@ func (i *fetchTaggedResultsIter) Current() IDResult {

func (i *fetchTaggedResultsIter) Close(err error) {
i.instrumentClose(err)
queryRange := i.queryOpts.EndExclusive.Sub(i.queryOpts.StartInclusive)
now := i.nowFn()
elapsed := now.Sub(i.startTime)
i.dataReadMetrics.byRange.Record(i.endExclusive.Sub(i.startInclusive), elapsed)
i.dataReadMetrics.byDocs.Record(i.totalDocsCount, elapsed)
dataReadTime := now.Sub(i.dataReadStart)
i.dataReadMetrics.ByRange.Record(queryRange, dataReadTime)
i.dataReadMetrics.ByDocs.Record(i.totalDocsCount, dataReadTime)

totalElapsed := now.Sub(i.fetchStart)
i.totalMetrics.byRange.Record(i.endExclusive.Sub(i.startInclusive), totalElapsed)
i.totalMetrics.byDocs.Record(i.totalDocsCount, totalElapsed)
totalFetchTime := now.Sub(i.fetchStart)
i.totalMetrics.ByRange.Record(queryRange, totalFetchTime)
i.totalMetrics.ByDocs.Record(i.totalDocsCount, totalFetchTime)
}

// IDResult is the FetchTagged result for a series ID.
Expand Down Expand Up @@ -1142,8 +1104,8 @@ func (s *service) Aggregate(tctx thrift.Context, req *rpc.AggregateQueryRequest)
duration := s.nowFn().Sub(callStart)
queryTiming := s.metrics.queryTimingAggregate
rng := time.Duration(req.RangeEnd - req.RangeStart)
queryTiming.byRange.Record(rng, duration)
queryTiming.byDocs.Record(size, duration)
queryTiming.ByRange.Record(rng, duration)
queryTiming.ByDocs.Record(size, duration)

return response, nil
}
Expand Down Expand Up @@ -1197,8 +1159,8 @@ func (s *service) AggregateRaw(tctx thrift.Context, req *rpc.AggregateQueryRawRe
duration := s.nowFn().Sub(callStart)
queryTiming := s.metrics.queryTimingAggregateRaw
rng := time.Duration(req.RangeEnd - req.RangeStart)
queryTiming.byRange.Record(rng, duration)
queryTiming.byDocs.Record(size, duration)
queryTiming.ByRange.Record(rng, duration)
queryTiming.ByDocs.Record(size, duration)

s.metrics.aggregate.ReportSuccess(s.nowFn().Sub(callStart))
return response, nil
Expand Down
46 changes: 16 additions & 30 deletions src/dbnode/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/dbnode/storage/index/compaction"
"github.com/m3db/m3/src/dbnode/storage/index/convert"
"github.com/m3db/m3/src/dbnode/storage/limits"
"github.com/m3db/m3/src/dbnode/storage/series"
"github.com/m3db/m3/src/dbnode/tracepoint"
"github.com/m3db/m3/src/dbnode/ts/writes"
Expand Down Expand Up @@ -1672,12 +1671,12 @@ func (i *nsIndex) queryWithSpan(
queryRuntime := time.Since(indexMatchingStartTime)
queryRange := opts.EndExclusive.Sub(opts.StartInclusive)

i.metrics.queryTotalTimeByRange.Record(queryRange, queryRuntime)
i.metrics.queryTotalTimeByCardinality.Record(results.TotalDocsCount(), queryRuntime)
i.metrics.querySearchByRange.Record(queryRange, results.TotalDuration().Search)
i.metrics.querySearchByCardinality.Record(results.TotalDocsCount(), results.TotalDuration().Search)
i.metrics.queryProcessingTimeByRange.Record(queryRange, results.TotalDuration().Total)
i.metrics.queryProcessingTimeByCardinality.Record(results.TotalDocsCount(), results.TotalDuration().Total)
i.metrics.queryTotalTime.ByRange.Record(queryRange, queryRuntime)
i.metrics.queryTotalTime.ByDocs.Record(results.TotalDocsCount(), queryRuntime)
i.metrics.queryProcessingTime.ByRange.Record(queryRange, results.TotalDuration().Total)
i.metrics.queryProcessingTime.ByDocs.Record(results.TotalDocsCount(), results.TotalDuration().Total)
i.metrics.querySearchTime.ByRange.Record(queryRange, results.TotalDuration().Search)
i.metrics.querySearchTime.ByDocs.Record(results.TotalDocsCount(), results.TotalDuration().Search)

return exhaustive, nil
}
Expand Down Expand Up @@ -2226,24 +2225,14 @@ type nsIndexMetrics struct {
queryNonExhaustiveSeriesLimitError tally.Counter
queryNonExhaustiveDocsLimitError tally.Counter

// query*Time metrics by both range and cardinality. byRange buckets by the query time window ([start,end]).
// byCardinality buckets by the # of documents returned by the query. byRange allows us to understand the impact
// of queries that look at many index blocks. byCardinality allows us to understand the impact of queries that
// return many documents from index blocks.

// the total time for a query, including waiting for processing resources.
queryTotalTimeByRange limits.QueryDurationMetrics
queryTotalTimeByCardinality limits.QueryCardinalityMetrics

// the total time a query was consuming processing resources. queryTotalTime - queryProcessing == time waiting for
// resources.
queryProcessingTimeByRange limits.QueryDurationMetrics
queryProcessingTimeByCardinality limits.QueryCardinalityMetrics

// the total time a query was searching for documents. queryProcessing - querySearch == time processing search
// results.
querySearchByRange limits.QueryDurationMetrics
querySearchByCardinality limits.QueryCardinalityMetrics
queryTotalTime index.QueryMetrics
// the total time a query was consuming processing resources. queryTotalTime - queryProcessingTime == time waiting
// for resources.
queryProcessingTime index.QueryMetrics
// the total time a query was searching for documents. queryProcessingTime - querySearchTime == time processing
// search results.
querySearchTime index.QueryMetrics
}

func newNamespaceIndexMetrics(
Expand Down Expand Up @@ -2339,12 +2328,9 @@ func newNamespaceIndexMetrics(
"exhaustive": "false",
"result": "error_docs_require_exhaustive",
}).Counter("query"),
queryTotalTimeByRange: limits.NewQueryRangeMetrics("query_total", scope),
queryTotalTimeByCardinality: limits.NewCardinalityMetrics("query_total", scope),
queryProcessingTimeByRange: limits.NewQueryRangeMetrics("query_processing", scope),
queryProcessingTimeByCardinality: limits.NewCardinalityMetrics("query_processing", scope),
querySearchByRange: limits.NewQueryRangeMetrics("query_search", scope),
querySearchByCardinality: limits.NewCardinalityMetrics("query_search", scope),
queryTotalTime: index.NewQueryMetrics("query_total", scope),
queryProcessingTime: index.NewQueryMetrics("query_processing", scope),
querySearchTime: index.NewQueryMetrics("query_search", scope),
}

// Initialize gauges that should default to zero before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package limits
package index

import (
"math"
Expand All @@ -29,6 +29,20 @@ import (
"github.com/m3db/m3/src/x/instrument"
)

// NewQueryMetrics returns a new QueryMetrics.
func NewQueryMetrics(name string, scope tally.Scope) QueryMetrics {
return QueryMetrics{
ByRange: NewQueryRangeMetrics(name, scope),
ByDocs: NewCardinalityMetrics(name, scope),
}
}

// QueryMetrics is a composite type of QueryDurationMetrics and QueryCardinalityMetrics.
type QueryMetrics struct {
ByRange QueryDurationMetrics
ByDocs QueryCardinalityMetrics
}

type queryRangeHist struct {
threshold time.Duration
timing tally.Histogram
Expand Down Expand Up @@ -81,7 +95,7 @@ func NewQueryRangeMetrics(metricType string, scope tally.Scope) QueryDurationMet
}
}

// QueryDurationMetrics are metrics for query runtime by duration.
// QueryDurationMetrics are timing metrics bucketed by the query window (start, end).
type QueryDurationMetrics interface {
// Record records the runtime for queries given their range.
Record(queryRange time.Duration, queryRuntime time.Duration)
Expand Down Expand Up @@ -139,8 +153,8 @@ func NewCardinalityMetrics(metricType string, scope tally.Scope) QueryCardinalit
}
}

// QueryCardinalityMetrics are metrics for query runtime by cardinality.
// QueryCardinalityMetrics are metrics bucketed by the # of documents returned by a query result.
type QueryCardinalityMetrics interface {
// Record records the runtime for queries given their cardinality.
Record(seriesCount int, queryRuntime time.Duration)
Record(docCount int, queryRuntime time.Duration)
}

0 comments on commit 07960d6

Please sign in to comment.