Skip to content

Commit

Permalink
[dbnode] Add metric for series blocks read in FetchTagged endpoint (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
nbroyles authored Feb 16, 2021
1 parent 70d7b9b commit d4ff5a6
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func TestFetchResultIterTest(t *testing.T) {
nowFn: time.Now,
dataReadMetrics: index.NewQueryMetrics("", scope),
totalMetrics: index.NewQueryMetrics("", scope),
seriesBlocks: scope.Histogram("series-blocks", tally.MustMakeExponentialValueBuckets(10, 2, 5)),
instrumentClose: func(err error) {},
})
total := 0
Expand All @@ -76,6 +77,7 @@ func TestFetchResultIterTest(t *testing.T) {
require.Equal(t, 10, total)
require.Equal(t, 5, blockPermits.acquired)
require.Equal(t, 5, blockPermits.released)
requireSeriesBlockMetric(t, scope)
}

func TestFetchResultIterTestUnsetBlocksPerBatch(t *testing.T) {
Expand All @@ -100,6 +102,7 @@ func TestFetchResultIterTestUnsetBlocksPerBatch(t *testing.T) {
nowFn: time.Now,
dataReadMetrics: index.NewQueryMetrics("", scope),
totalMetrics: index.NewQueryMetrics("", scope),
seriesBlocks: scope.Histogram("series-blocks", tally.MustMakeExponentialValueBuckets(10, 2, 5)),
instrumentClose: func(err error) {},
})
total := 0
Expand All @@ -114,6 +117,18 @@ func TestFetchResultIterTestUnsetBlocksPerBatch(t *testing.T) {
require.Equal(t, 10, total)
require.Equal(t, 10, blockPermits.acquired)
require.Equal(t, 10, blockPermits.released)
requireSeriesBlockMetric(t, scope)
}

func requireSeriesBlockMetric(t *testing.T, scope tally.TestScope) {
values, ok := scope.Snapshot().Histograms()["series-blocks+"]
require.True(t, ok)

sum := 0
for _, count := range values.Values() {
sum += int(count)
}
require.Equal(t, 1, sum)
}

func setup(mocks *gomock.Controller) (
Expand Down
26 changes: 18 additions & 8 deletions src/dbnode/network/server/tchannelthrift/node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,12 @@ type serviceMetrics struct {
queryTimingAggregate index.QueryMetrics
// the total time to call AggregateRaw.
queryTimingAggregateRaw index.QueryMetrics
// the series blocks read during a call to fetchTagged
fetchTaggedSeriesBlocks tally.Histogram
}

func newServiceMetrics(scope tally.Scope, opts instrument.TimerOptions) serviceMetrics {
buckets := append(tally.ValueBuckets{0}, tally.MustMakeExponentialValueBuckets(100, 2, 16)...)
return serviceMetrics{
fetch: instrument.NewMethodMetrics(scope, "fetch", opts),
fetchTagged: instrument.NewMethodMetrics(scope, "fetchTagged", opts),
Expand Down Expand Up @@ -165,6 +168,7 @@ func newServiceMetrics(scope tally.Scope, opts instrument.TimerOptions) serviceM
queryTimingAggregate: index.NewQueryMetrics("aggregate", scope),
queryTimingAggregateRaw: index.NewQueryMetrics("aggregate_raw", scope),
queryTimingDataRead: index.NewQueryMetrics("data_read", scope),
fetchTaggedSeriesBlocks: scope.Histogram("fetchTagged-seriesBlocks", buckets),
}
}

Expand Down Expand Up @@ -860,6 +864,7 @@ func (s *service) fetchTaggedIter(ctx context.Context, req *rpc.FetchTaggedReque
dataReadMetrics: s.metrics.queryTimingDataRead,
totalMetrics: s.metrics.queryTimingFetchTagged,
blocksPerBatch: s.opts.FetchTaggedSeriesBlocksPerBatch(),
seriesBlocks: s.metrics.fetchTaggedSeriesBlocks,
}), nil
}

Expand Down Expand Up @@ -894,14 +899,15 @@ type FetchTaggedResultsIter interface {

type fetchTaggedResultsIter struct {
fetchTaggedResultsIterOpts
idResults []idResult
idx int
blockReadIdx int
cur IDResult
err error
batchesAcquired int
blocksAvailable int
dataReadStart time.Time
idResults []idResult
idx int
blockReadIdx int
cur IDResult
err error
batchesAcquired int
blocksAvailable int
dataReadStart time.Time
totalSeriesBlocks int
}

type fetchTaggedResultsIterOpts struct {
Expand All @@ -921,6 +927,7 @@ type fetchTaggedResultsIterOpts struct {
totalDocsCount int
dataReadMetrics index.QueryMetrics
totalMetrics index.QueryMetrics
seriesBlocks tally.Histogram
}

func newFetchTaggedResultsIter(opts fetchTaggedResultsIterOpts) FetchTaggedResultsIter { //nolint: gocritic
Expand Down Expand Up @@ -994,6 +1001,7 @@ func (i *fetchTaggedResultsIter) Next(ctx context.Context) bool {

for blockIter.Next(ctx) {
curr := blockIter.Current()
i.totalSeriesBlocks++
currResult.blockReaders = append(currResult.blockReaders, curr)
acquired, err := i.acquire(ctx, i.blockReadIdx)
if err != nil {
Expand Down Expand Up @@ -1075,6 +1083,8 @@ func (i *fetchTaggedResultsIter) Close(err error) {
i.totalMetrics.ByRange.Record(queryRange, totalFetchTime)
i.totalMetrics.ByDocs.Record(i.totalDocsCount, totalFetchTime)

i.seriesBlocks.RecordValue(float64(i.totalSeriesBlocks))

for n := 0; n < i.batchesAcquired; n++ {
i.blockPermits.Release()
}
Expand Down

0 comments on commit d4ff5a6

Please sign in to comment.