Skip to content

Commit

Permalink
Add more opencensus tracing in query and commit endpoints.
Browse files Browse the repository at this point in the history
  • Loading branch information
manishrjain committed Nov 10, 2018
1 parent dada74f commit 9e7fa05
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 63 deletions.
56 changes: 19 additions & 37 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"bytes"
"fmt"
"math"
"math/rand"
"os"
"strings"
"sync"
Expand Down Expand Up @@ -326,15 +325,16 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er
return empty, err
}

func annotateStartTs(span *otrace.Span, ts uint64) {
span.Annotate([]otrace.Attribute{otrace.Int64Attribute("startTs", int64(ts))}, "")
}

func (s *Server) Mutate(ctx context.Context, mu *api.Mutation) (resp *api.Assigned, err error) {
ctx, span := otrace.StartSpan(ctx, "Server.Mutate")
defer span.End()

resp = &api.Assigned{}
if err := x.HealthCheck(); err != nil {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Request rejected %v", err)
}
return resp, err
}

Expand All @@ -344,18 +344,14 @@ func (s *Server) Mutate(ctx context.Context, mu *api.Mutation) (resp *api.Assign
if mu.StartTs == 0 {
mu.StartTs = State.getTimestamp(false)
}
annotateStartTs(span, mu.StartTs)
emptyMutation :=
len(mu.GetSetJson()) == 0 && len(mu.GetDeleteJson()) == 0 &&
len(mu.Set) == 0 && len(mu.Del) == 0 &&
len(mu.SetNquads) == 0 && len(mu.DelNquads) == 0
if emptyMutation {
return resp, fmt.Errorf("empty mutation")
}
if rand.Float64() < worker.Config.Tracing {
var tr trace.Trace
tr, ctx = x.NewTrace("Server.Mutate", ctx)
defer tr.Finish()
}

var l query.Latency
l.Start = time.Now()
Expand Down Expand Up @@ -387,6 +383,7 @@ func (s *Server) Mutate(ctx context.Context, mu *api.Mutation) (resp *api.Assign
Edges: edges,
StartTs: mu.StartTs,
}
span.Annotate(nil, "Applying mutations")
resp.Context, err = query.ApplyMutations(ctx, m)
if !mu.CommitNow {
if err == y.ErrConflict {
Expand All @@ -412,16 +409,11 @@ func (s *Server) Mutate(ctx context.Context, mu *api.Mutation) (resp *api.Assign
}
return resp, err
}
tr, ok := trace.FromContext(ctx)
if ok {
tr.LazyPrintf("Prewrites err: %v. Attempting to commit/abort immediately.", err)
}
span.Annotatef(nil, "Prewrites err: %v. Attempting to commit/abort immediately.", err)
ctxn := resp.Context
// zero would assign the CommitTs
cts, err := worker.CommitOverNetwork(ctx, ctxn)
if ok {
tr.LazyPrintf("Status of commit at ts: %d: %v", ctxn.StartTs, err)
}
span.Annotatef(nil, "Status of commit at ts: %d: %v", ctxn.StartTs, err)
if err != nil {
if err == y.ErrAborted {
err = status.Errorf(codes.Aborted, err.Error())
Expand All @@ -441,6 +433,9 @@ func (s *Server) Query(ctx context.Context, req *api.Request) (resp *api.Respons
if glog.V(3) {
glog.Infof("Got a query: %+v", req)
}
ctx, span := otrace.StartSpan(ctx, "Server.Query")
defer span.End()

if err := x.HealthCheck(); err != nil {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Request rejected %v", err)
Expand All @@ -455,25 +450,15 @@ func (s *Server) Query(ctx context.Context, req *api.Request) (resp *api.Respons
return resp, ctx.Err()
}

if rand.Float64() < worker.Config.Tracing {
var tr trace.Trace
tr, ctx = x.NewTrace("GrpcQuery", ctx)
defer tr.Finish()
}

resp = new(api.Response)
if len(req.Query) == 0 {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Empty query")
}
span.Annotate(nil, "Empty query")
return resp, fmt.Errorf("empty query")
}

var l query.Latency
l.Start = time.Now()
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Query request received: %v", req)
}
span.Annotatef(nil, "Query received: %v", req)

parsedReq, err := gql.Parse(gql.Request{
Str: req.Query,
Expand All @@ -489,6 +474,7 @@ func (s *Server) Query(ctx context.Context, req *api.Request) (resp *api.Respons
resp.Txn = &api.TxnContext{
StartTs: req.StartTs,
}
annotateStartTs(span, req.StartTs)

var queryRequest = query.QueryRequest{
Latency: &l,
Expand All @@ -499,18 +485,12 @@ func (s *Server) Query(ctx context.Context, req *api.Request) (resp *api.Respons
// Core processing happens here.
var er query.ExecuteResult
if er, err = queryRequest.Process(ctx); err != nil {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Error while processing query: %+v", err)
}
return resp, x.Wrap(err)
}
resp.Schema = er.SchemaNode

json, err := query.ToJson(&l, er.Subgraphs)
if err != nil {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Error while converting to protocol buffer: %+v", err)
}
return resp, err
}
resp.Json = json
Expand All @@ -525,8 +505,10 @@ func (s *Server) Query(ctx context.Context, req *api.Request) (resp *api.Respons
return resp, err
}

func (s *Server) CommitOrAbort(ctx context.Context, tc *api.TxnContext) (*api.TxnContext,
error) {
func (s *Server) CommitOrAbort(ctx context.Context, tc *api.TxnContext) (*api.TxnContext, error) {
ctx, span := otrace.StartSpan(ctx, "Server.CommitOrAbort")
defer span.End()

if err := x.HealthCheck(); err != nil {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Request rejected %v", err)
Expand All @@ -535,10 +517,10 @@ func (s *Server) CommitOrAbort(ctx context.Context, tc *api.TxnContext) (*api.Tx
}

tctx := &api.TxnContext{}

if tc.StartTs == 0 {
return &api.TxnContext{}, fmt.Errorf("StartTs cannot be zero while committing a transaction.")
}
annotateStartTs(span, tc.StartTs)
commitTs, err := worker.CommitOverNetwork(ctx, tc)
if err == y.ErrAborted {
tctx.Aborted = true
Expand Down
17 changes: 9 additions & 8 deletions query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/golang/glog"
otrace "go.opencensus.io/trace"
"golang.org/x/net/trace"
"google.golang.org/grpc/metadata"

Expand Down Expand Up @@ -1838,6 +1839,9 @@ func getReversePredicates(ctx context.Context) ([]string, error) {
// ProcessGraph processes the SubGraph instance accumulating result for the query
// from different instances. Note: taskQuery is nil for root node.
func ProcessGraph(ctx context.Context, sg, parent *SubGraph, rch chan error) {
ctx, span := otrace.StartSpan(ctx, "query.ProcessGraph")
defer span.End()

if sg.Attr == "uid" {
// We dont need to call ProcessGraph for uid, as we already have uids
// populated from parent and there is nothing to process but uidMatrix
Expand Down Expand Up @@ -2432,6 +2436,9 @@ type QueryRequest struct {
// Fills Subgraphs and Vars.
// It optionally also returns a map of the allocated uids in case of an upsert request.
func (req *QueryRequest) ProcessQuery(ctx context.Context) (err error) {
ctx, span := otrace.StartSpan(ctx, "query.ProcessQuery")
defer span.End()

// doneVars stores the processed variables.
req.vars = make(map[string]varValue)
loopStart := time.Now()
Expand All @@ -2441,11 +2448,7 @@ func (req *QueryRequest) ProcessQuery(ctx context.Context) (err error) {

if gq == nil || (len(gq.UID) == 0 && gq.Func == nil && len(gq.NeedsVar) == 0 &&
gq.Alias != "shortest" && !gq.IsEmpty) {
err := x.Errorf("Invalid query, query pb.id is zero and generator is nil")
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf(err.Error())
}
return err
return x.Errorf("Invalid query, query pb.id is zero and generator is nil")
}
sg, err := ToSubGraph(ctx, gq)
if err != nil {
Expand All @@ -2454,9 +2457,7 @@ func (req *QueryRequest) ProcessQuery(ctx context.Context) (err error) {
sg.recurse(func(sg *SubGraph) {
sg.ReadTs = req.ReadTs
})
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Query parsed")
}
span.Annotate(nil, "Query parsed")
req.Subgraphs = append(req.Subgraphs, sg)
}
req.Latency.Parsing += time.Since(loopStart)
Expand Down
11 changes: 10 additions & 1 deletion worker/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func runSchemaMutation(ctx context.Context, update *pb.SchemaUpdate, startTs uin
if err := runSchemaMutationHelper(ctx, update, startTs); err != nil {
// on error, we restore the memory state to be the same as the disk
maxRetries := 10
loadErr := x.RetryUntilSuccess(maxRetries, 10 * time.Millisecond, func() error {
loadErr := x.RetryUntilSuccess(maxRetries, 10*time.Millisecond, func() error {
return schema.Load(update.Predicate)
})

Expand Down Expand Up @@ -553,12 +553,21 @@ func MutateOverNetwork(ctx context.Context, m *pb.Mutations) (*api.TxnContext, e

// CommitOverNetwork makes a proxy call to Zero to commit or abort a transaction.
func CommitOverNetwork(ctx context.Context, tc *api.TxnContext) (uint64, error) {
ctx, span := otrace.StartSpan(ctx, "worker.CommitOverNetwork")
defer span.End()

pl := groups().Leader(0)
if pl == nil {
return 0, conn.ErrNoConnection
}
zc := pb.NewZeroClient(pl.Get())
tctx, err := zc.CommitOrAbort(ctx, tc)

var attributes []otrace.Attribute
attributes = append(attributes, otrace.Int64Attribute("commitTs", int64(tctx.CommitTs)))
attributes = append(attributes, otrace.BoolAttribute("committed", tctx.CommitTs > 0))
span.Annotatef(attributes, "Error=%v", err)

if err != nil {
return 0, err
}
Expand Down
7 changes: 7 additions & 0 deletions worker/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package worker

import (
otrace "go.opencensus.io/trace"
"golang.org/x/net/context"
"golang.org/x/net/trace"

Expand All @@ -41,6 +42,9 @@ type resultErr struct {
// predicates is not specified, then all the predicates belonging to the group
// are returned
func getSchema(ctx context.Context, s *pb.SchemaRequest) (*pb.SchemaResult, error) {
ctx, span := otrace.StartSpan(ctx, "worker.getSchema")
defer span.End()

var result pb.SchemaResult
var predicates []string
var fields []string
Expand Down Expand Up @@ -162,6 +166,9 @@ func getSchemaOverNetwork(ctx context.Context, gid uint32, s *pb.SchemaRequest,
// GetSchemaOverNetwork checks which group should be serving the schema
// according to fingerprint of the predicate and sends it to that instance.
func GetSchemaOverNetwork(ctx context.Context, schema *pb.SchemaRequest) ([]*api.SchemaNode, error) {
ctx, span := otrace.StartSpan(ctx, "worker.GetSchemaOverNetwork")
defer span.End()

if err := x.HealthCheck(); err != nil {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Request rejected %v", err)
Expand Down
28 changes: 11 additions & 17 deletions worker/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"bytes"
"errors"
"fmt"
"math/rand"
"sort"
"strconv"
"strings"
Expand All @@ -39,6 +38,7 @@ import (
"github.com/dgraph-io/dgraph/types/facets"
"github.com/dgraph-io/dgraph/x"
"github.com/golang/glog"
otrace "go.opencensus.io/trace"

cindex "github.com/google/codesearch/index"
cregexp "github.com/google/codesearch/regexp"
Expand Down Expand Up @@ -149,22 +149,14 @@ func ProcessTaskOverNetwork(ctx context.Context, q *pb.Query) (*pb.Result, error
}

result, err := processWithBackupRequest(ctx, gid, func(ctx context.Context, c pb.WorkerClient) (interface{}, error) {
if tr, ok := trace.FromContext(ctx); ok {
id := fmt.Sprintf("%d", rand.Int())
tr.LazyPrintf("Sending request to server, id: %s", id)
ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("trace", id))
}
return c.ServeTask(ctx, q)
})
if err != nil {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Error while worker.ServeTask: %v", err)
}
return nil, err
}
reply := result.(*pb.Result)
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Reply from server. length: %v Group: %v Attr: %v", len(reply.UidMatrix), gid, attr)
if span := otrace.FromContext(ctx); span != nil {
span.Annotatef(nil, "Reply from server. length: %v Group: %v Attr: %v", len(reply.UidMatrix), gid, attr)
}
return reply, nil
}
Expand Down Expand Up @@ -599,11 +591,13 @@ func handleUidPostings(ctx context.Context, args funcArgs, opts posting.ListOpti

// processTask processes the query, accumulates and returns the result.
func processTask(ctx context.Context, q *pb.Query, gid uint32) (*pb.Result, error) {
span := otrace.FromContext(ctx)
span.Annotate(nil, "Waiting for startTs")
if err := posting.Oracle().WaitForTs(ctx, q.ReadTs); err != nil {
return &emptyResult, err
}
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Done waiting for maxPending to catch up for Attr %q, readTs: %d\n",
if span != nil {
span.Annotatef(nil, "Done waiting for maxPending to catch up for Attr %q, readTs: %d\n",
q.Attr, q.ReadTs)
}

Expand Down Expand Up @@ -1318,6 +1312,9 @@ func parseSrcFn(q *pb.Query) (*functionContext, error) {

// ServeTask is used to respond to a query.
func (w *grpcWorker) ServeTask(ctx context.Context, q *pb.Query) (*pb.Result, error) {
ctx, span := otrace.StartSpan(ctx, "worker.ServeTask")
defer span.End()

if ctx.Err() != nil {
return &emptyResult, ctx.Err()
}
Expand All @@ -1327,12 +1324,9 @@ func (w *grpcWorker) ServeTask(ctx context.Context, q *pb.Query) (*pb.Result, er
if q.UidList != nil {
numUids = len(q.UidList.Uids)
}
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Attribute: %q NumUids: %v groupId: %v ServeTask", q.Attr, numUids, gid)
}
span.Annotatef(nil, "Attribute: %q NumUids: %v groupId: %v ServeTask", q.Attr, numUids, gid)

if !groups().ServesGroup(gid) {
// TODO(pawan) - Log this when we have debug logs.
return nil, fmt.Errorf("Temporary error, attr: %q groupId: %v Request sent to wrong server",
q.Attr, gid)
}
Expand Down

0 comments on commit 9e7fa05

Please sign in to comment.