Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move instrumentation cleanup to FetchTaggedResultIterator Close() #3173

Merged
merged 5 commits into from
Feb 5, 2021
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 72 additions & 43 deletions src/dbnode/network/server/tchannelthrift/node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ type Service interface {
rpc.TChanNode

// FetchTaggedIter returns an iterator for the results of FetchTagged.
// It is the responsibility of the caller to close the returned iterator.
FetchTaggedIter(ctx context.Context, req *rpc.FetchTaggedRequest) (FetchTaggedResultsIter, error)

// Only safe to be called one time once the service has started.
Expand Down Expand Up @@ -722,35 +723,19 @@ func (s *service) readDatapoints(
}

func (s *service) FetchTagged(tctx thrift.Context, req *rpc.FetchTaggedRequest) (*rpc.FetchTaggedResult_, error) {
callStart := s.nowFn()

ctx := addSourceToContext(tctx, req.Source)
ctx, sp, sampled := ctx.StartSampledTraceSpan(tracepoint.FetchTagged)
if sampled {
sp.LogFields(
opentracinglog.String("query", string(req.Query)),
opentracinglog.String("namespace", string(req.NameSpace)),
xopentracing.Time("start", time.Unix(0, req.RangeStart)),
xopentracing.Time("end", time.Unix(0, req.RangeEnd)),
)
}

result, err := s.fetchTagged(ctx, req)
if sampled && err != nil {
sp.LogFields(opentracinglog.Error(err))
}
sp.Finish()

s.metrics.fetchTagged.ReportSuccessOrError(err, s.nowFn().Sub(callStart))
return result, err
}

func (s *service) fetchTagged(ctx context.Context, req *rpc.FetchTaggedRequest) (*rpc.FetchTaggedResult_, error) {
ctx := tchannelthrift.Context(tctx)
iter, err := s.FetchTaggedIter(ctx, req)
if err != nil {
return nil, err
}
result, err := s.buildFetchTaggedResult(ctx, iter)
iter.Close(err)

return result, err
}

func (s *service) buildFetchTaggedResult(ctx context.Context, iter FetchTaggedResultsIter) (*rpc.FetchTaggedResult_,
error) {
response := &rpc.FetchTaggedResult_{
Elements: make([]*rpc.FetchTaggedIDResult_, 0, iter.NumIDs()),
Exhaustive: iter.Exhaustive(),
Expand Down Expand Up @@ -781,6 +766,35 @@ func (s *service) fetchTagged(ctx context.Context, req *rpc.FetchTaggedRequest)
}

func (s *service) FetchTaggedIter(ctx context.Context, req *rpc.FetchTaggedRequest) (FetchTaggedResultsIter, error) {
callStart := s.nowFn()
ctx = addSourceToM3Context(ctx, req.Source)
ctx, sp, sampled := ctx.StartSampledTraceSpan(tracepoint.FetchTagged)
if sampled {
sp.LogFields(
opentracinglog.String("query", string(req.Query)),
opentracinglog.String("namespace", string(req.NameSpace)),
xopentracing.Time("start", time.Unix(0, req.RangeStart)),
xopentracing.Time("end", time.Unix(0, req.RangeEnd)),
)
}

instrumentClose := func(err error) {
if sampled && err != nil {
sp.LogFields(opentracinglog.Error(err))
}
sp.Finish()

s.metrics.fetchTagged.ReportSuccessOrError(err, s.nowFn().Sub(callStart))
}
iter, err := s.fetchTaggedIter(ctx, req, instrumentClose)
if err != nil {
instrumentClose(err)
}
return iter, err
}

func (s *service) fetchTaggedIter(ctx context.Context, req *rpc.FetchTaggedRequest, instrumentClose func(error)) (
FetchTaggedResultsIter, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: formatting is odd here

db, err := s.startReadRPCWithDB()
if err != nil {
return nil, err
Expand All @@ -803,15 +817,16 @@ func (s *service) FetchTaggedIter(ctx context.Context, req *rpc.FetchTaggedReque
ctx.RegisterFinalizer(tagEncoder)

return newFetchTaggedResultsIter(fetchTaggedResultsIterOpts{
queryResult: queryResult,
queryOpts: opts,
fetchData: fetchData,
db: db,
docReader: docs.NewEncodedDocumentReader(),
nsID: ns,
tagEncoder: tagEncoder,
iOpts: s.opts.InstrumentOptions(),
blockReadLimit: s.queryLimits.DiskSeriesReadLimit(),
queryResult: queryResult,
queryOpts: opts,
fetchData: fetchData,
db: db,
docReader: docs.NewEncodedDocumentReader(),
nsID: ns,
tagEncoder: tagEncoder,
iOpts: s.opts.InstrumentOptions(),
blockReadLimit: s.queryLimits.DiskSeriesReadLimit(),
instrumentClose: instrumentClose,
}), nil
}

Expand All @@ -838,6 +853,10 @@ type FetchTaggedResultsIter interface {

// Current returns the current IDResult fetched with Next. The result is only valid if Err is nil.
Current() IDResult

// Close closes the iterator. The provided error is non-nil if the client of the Iterator encountered an error
// while iterating.
Close(err error)
}

type fetchTaggedResultsIter struct {
Expand All @@ -857,18 +876,20 @@ type fetchTaggedResultsIter struct {
tagEncoder serialize.TagEncoder
iOpts instrument.Options
blocksReadLimit limits.LookbackLimit
instrumentClose func(error)
}

type fetchTaggedResultsIterOpts struct {
queryResult index.QueryResult
queryOpts index.QueryOptions
fetchData bool
db storage.Database
docReader *docs.EncodedDocumentReader
nsID ident.ID
tagEncoder serialize.TagEncoder
iOpts instrument.Options
blockReadLimit limits.LookbackLimit
queryResult index.QueryResult
queryOpts index.QueryOptions
fetchData bool
db storage.Database
docReader *docs.EncodedDocumentReader
nsID ident.ID
tagEncoder serialize.TagEncoder
iOpts instrument.Options
blockReadLimit limits.LookbackLimit
instrumentClose func(error)
}

func newFetchTaggedResultsIter(opts fetchTaggedResultsIterOpts) FetchTaggedResultsIter { //nolint: gocritic
Expand All @@ -885,6 +906,7 @@ func newFetchTaggedResultsIter(opts fetchTaggedResultsIterOpts) FetchTaggedResul
tagEncoder: opts.tagEncoder,
iOpts: opts.iOpts,
blocksReadLimit: opts.blockReadLimit,
instrumentClose: opts.instrumentClose,
}

return iter
Expand Down Expand Up @@ -968,6 +990,10 @@ func (i *fetchTaggedResultsIter) Current() IDResult {
return i.cur
}

func (i *fetchTaggedResultsIter) Close(err error) {
i.instrumentClose(err)
}

// IDResult is the FetchTagged result for a series ID.
type IDResult interface {
// ID returns the series ID.
Expand Down Expand Up @@ -2800,7 +2826,10 @@ func finalizeAnnotationFn(b []byte) {
}

func addSourceToContext(tctx thrift.Context, source []byte) context.Context {
ctx := tchannelthrift.Context(tctx)
return addSourceToM3Context(tchannelthrift.Context(tctx), source)
}

func addSourceToM3Context(ctx context.Context, source []byte) context.Context {
if len(source) > 0 {
if base, ok := ctx.GoContext(); ok {
ctx.SetGoContext(goctx.WithValue(base, limits.SourceContextKey, source))
Expand Down