From ab96596f57eb9440ae257c67286aa5cf2fe69875 Mon Sep 17 00:00:00 2001 From: Harshil Goel <54325286+harshil-goel@users.noreply.github.com> Date: Fri, 31 Jan 2025 13:30:40 +0530 Subject: [PATCH] perf(core): Optimize reading data during mutation for scalar types (#9290) --- posting/lists.go | 46 ++++++++++++++++++++++++++++++++-------------- posting/mvcc.go | 11 +++++++++++ posting/oracle.go | 23 +++++++++++++++++++++++ worker/mutation.go | 24 ++++++++++++------------ x/metrics.go | 10 ++++++++++ 5 files changed, 88 insertions(+), 26 deletions(-) diff --git a/posting/lists.go b/posting/lists.go index aed314be4b7..4889b8a6a18 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -18,9 +18,13 @@ package posting import ( "bytes" + "context" "fmt" "sync" + "time" + ostats "go.opencensus.io/stats" + "go.opencensus.io/tag" "google.golang.org/protobuf/proto" "github.com/dgraph-io/badger/v4" @@ -300,6 +304,33 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) return lc.SetIfAbsent(skey, pl), nil } +func (lc *LocalCache) readPostingListAt(key []byte) (*pb.PostingList, error) { + start := time.Now() + defer func() { + pk, _ := x.Parse(key) + ms := x.SinceMs(start) + var tags []tag.Mutator + tags = append(tags, tag.Upsert(x.KeyMethod, "get")) + tags = append(tags, tag.Upsert(x.KeyStatus, pk.Attr)) + _ = ostats.RecordWithTags(context.Background(), tags, x.BadgerReadLatencyMs.M(ms)) + }() + + pl := &pb.PostingList{} + txn := pstore.NewTransactionAt(lc.startTs, false) + defer txn.Discard() + + item, err := txn.Get(key) + if err != nil { + return nil, err + } + + err = item.Value(func(val []byte) error { + return proto.Unmarshal(val, pl) + }) + + return pl, err +} + // GetSinglePosting retrieves the cached version of the first item in the list associated with the // given key. This is used for retrieving the value of a scalar predicats. func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) { @@ -332,20 +363,7 @@ func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) { return pl, err } - pl = &pb.PostingList{} - txn := pstore.NewTransactionAt(lc.startTs, false) - defer txn.Discard() - - item, err := txn.Get(key) - if err != nil { - return nil, err - } - - err = item.Value(func(val []byte) error { - return proto.Unmarshal(val, pl) - }) - - return pl, err + return lc.readPostingListAt(key) } pl, err := getPostings() diff --git a/posting/mvcc.go b/posting/mvcc.go index f707bd0021c..a77a2758485 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -29,6 +29,7 @@ import ( "github.com/golang/glog" "github.com/pkg/errors" ostats "go.opencensus.io/stats" + "go.opencensus.io/tag" "google.golang.org/protobuf/proto" "github.com/dgraph-io/badger/v4" @@ -547,6 +548,16 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) { if err != nil { return nil, errors.Wrapf(err, "while reading posting list with key [%v]", key) } + + start := time.Now() + defer func() { + ms := x.SinceMs(start) + var tags []tag.Mutator + tags = append(tags, tag.Upsert(x.KeyMethod, "iterate")) + tags = append(tags, tag.Upsert(x.KeyStatus, pk.Attr)) + _ = ostats.RecordWithTags(context.Background(), tags, x.BadgerReadLatencyMs.M(ms)) + }() + if pk.HasStartUid { // Trying to read a single part of a multi part list. This type of list // should be read using using the main key because the information needed diff --git a/posting/oracle.go b/posting/oracle.go index a47d1e52a5e..b058288dada 100644 --- a/posting/oracle.go +++ b/posting/oracle.go @@ -24,6 +24,7 @@ import ( "sync/atomic" "time" + "github.com/dgraph-io/badger/v4" "github.com/golang/glog" ostats "go.opencensus.io/stats" @@ -163,6 +164,28 @@ func (txn *Txn) GetFromDelta(key []byte) (*List, error) { return txn.cache.GetFromDelta(key) } +func (txn *Txn) GetScalarList(key []byte) (*List, error) { + l, err := txn.cache.GetFromDelta(key) + if err != nil { + return nil, err + } + if l.mutationMap.len() == 0 && len(l.plist.Postings) == 0 { + pl, err := txn.cache.readPostingListAt(key) + if err == badger.ErrKeyNotFound { + return l, nil + } + if err != nil { + return nil, err + } + if pl.CommitTs == 0 { + l.mutationMap.setCurrentEntries(txn.StartTs, pl) + } else { + l.mutationMap.insertCommittedPostings(pl) + } + } + return l, nil +} + // Update calls UpdateDeltasAndDiscardLists on the local cache. func (txn *Txn) Update() { txn.cache.UpdateDeltasAndDiscardLists() diff --git a/worker/mutation.go b/worker/mutation.go index 79b3584185b..1bc13f08061 100644 --- a/worker/mutation.go +++ b/worker/mutation.go @@ -90,25 +90,25 @@ func runMutation(ctx context.Context, edge *pb.DirectedEdge, txn *posting.Txn) e // The following is a performance optimization which allows us to not read a posting list from // disk. We calculate this based on how AddMutationWithIndex works. The general idea is that if // we're not using the read posting list, we don't need to retrieve it. We need the posting list - // if we're doing indexing or count index or enforcing single UID, etc. In other cases, we can - // just create a posting list facade in memory and use it to store the delta in Badger. Later, - // the rollup operation would consolidate all these deltas into a posting list. + // if we're doing count index or delete operation. For scalar predicates, we just get the last item merged. + // In other cases, we can just create a posting list facade in memory and use it to store the delta in Badger. + // Later, the rollup operation would consolidate all these deltas into a posting list. + isList := su.GetList() var getFn func(key []byte) (*posting.List, error) switch { - case len(su.GetTokenizer()) > 0 || su.GetCount(): - // Any index or count index. - getFn = txn.Get - case su.GetValueType() == pb.Posting_UID && !su.GetList(): - // Single UID, not a list. + case len(edge.Lang) == 0 && !isList: + // Scalar Predicates, without lang + getFn = txn.GetScalarList + case len(edge.Lang) > 0 || su.GetCount(): + // Language or Count Index getFn = txn.Get case edge.Op == pb.DirectedEdge_DEL: // Covers various delete cases to keep things simple. getFn = txn.Get default: - // Reverse index doesn't need the posting list to be read. We already covered count index, - // single uid and delete all above. - // Values, whether single or list, don't need to be read. - // Uid list doesn't need to be read. + // Only count index needs to be read. For other indexes on list, we don't need to read any data. + // For indexes on scalar prediactes, only the last element needs to be left. + // Delete cases covered above. getFn = txn.GetFromDelta } diff --git a/x/metrics.go b/x/metrics.go index 3c19868b77c..11080f1e12f 100644 --- a/x/metrics.go +++ b/x/metrics.go @@ -71,6 +71,9 @@ var ( // LatencyMs is the latency of the various Dgraph operations. LatencyMs = ostats.Float64("latency", "Latency of the various methods", ostats.UnitMilliseconds) + // BadgerReadLatencyMs is the latency of various different predicate reads from badger. + BadgerReadLatencyMs = ostats.Float64("badger_read_latency_ms", + "Latency of the various methods", ostats.UnitMilliseconds) // Point-in-time metrics. @@ -201,6 +204,13 @@ var ( Aggregation: defaultLatencyMsDistribution, TagKeys: allTagKeys, }, + { + Name: BadgerReadLatencyMs.Name(), + Measure: BadgerReadLatencyMs, + Description: BadgerReadLatencyMs.Description(), + Aggregation: defaultLatencyMsDistribution, + TagKeys: allTagKeys, + }, { Name: NumQueries.Name(), Measure: NumQueries,