Skip to content

Commit

Permalink
perf(core): Optimize reading data during mutation for scalar types (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
harshil-goel authored Jan 31, 2025
1 parent 2aca89a commit ab96596
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 26 deletions.
46 changes: 32 additions & 14 deletions posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@ package posting

import (
"bytes"
"context"
"fmt"
"sync"
"time"

ostats "go.opencensus.io/stats"
"go.opencensus.io/tag"
"google.golang.org/protobuf/proto"

"github.com/dgraph-io/badger/v4"
Expand Down Expand Up @@ -300,6 +304,33 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error)
return lc.SetIfAbsent(skey, pl), nil
}

func (lc *LocalCache) readPostingListAt(key []byte) (*pb.PostingList, error) {
start := time.Now()
defer func() {
pk, _ := x.Parse(key)
ms := x.SinceMs(start)
var tags []tag.Mutator
tags = append(tags, tag.Upsert(x.KeyMethod, "get"))
tags = append(tags, tag.Upsert(x.KeyStatus, pk.Attr))
_ = ostats.RecordWithTags(context.Background(), tags, x.BadgerReadLatencyMs.M(ms))
}()

pl := &pb.PostingList{}
txn := pstore.NewTransactionAt(lc.startTs, false)
defer txn.Discard()

item, err := txn.Get(key)
if err != nil {
return nil, err
}

err = item.Value(func(val []byte) error {
return proto.Unmarshal(val, pl)
})

return pl, err
}

// GetSinglePosting retrieves the cached version of the first item in the list associated with the
// given key. This is used for retrieving the value of a scalar predicats.
func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {
Expand Down Expand Up @@ -332,20 +363,7 @@ func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {
return pl, err
}

pl = &pb.PostingList{}
txn := pstore.NewTransactionAt(lc.startTs, false)
defer txn.Discard()

item, err := txn.Get(key)
if err != nil {
return nil, err
}

err = item.Value(func(val []byte) error {
return proto.Unmarshal(val, pl)
})

return pl, err
return lc.readPostingListAt(key)
}

pl, err := getPostings()
Expand Down
11 changes: 11 additions & 0 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/golang/glog"
"github.com/pkg/errors"
ostats "go.opencensus.io/stats"
"go.opencensus.io/tag"
"google.golang.org/protobuf/proto"

"github.com/dgraph-io/badger/v4"
Expand Down Expand Up @@ -547,6 +548,16 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
if err != nil {
return nil, errors.Wrapf(err, "while reading posting list with key [%v]", key)
}

start := time.Now()
defer func() {
ms := x.SinceMs(start)
var tags []tag.Mutator
tags = append(tags, tag.Upsert(x.KeyMethod, "iterate"))
tags = append(tags, tag.Upsert(x.KeyStatus, pk.Attr))
_ = ostats.RecordWithTags(context.Background(), tags, x.BadgerReadLatencyMs.M(ms))
}()

if pk.HasStartUid {
// Trying to read a single part of a multi part list. This type of list
// should be read using using the main key because the information needed
Expand Down
23 changes: 23 additions & 0 deletions posting/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync/atomic"
"time"

"github.com/dgraph-io/badger/v4"
"github.com/golang/glog"
ostats "go.opencensus.io/stats"

Expand Down Expand Up @@ -163,6 +164,28 @@ func (txn *Txn) GetFromDelta(key []byte) (*List, error) {
return txn.cache.GetFromDelta(key)
}

func (txn *Txn) GetScalarList(key []byte) (*List, error) {
l, err := txn.cache.GetFromDelta(key)
if err != nil {
return nil, err
}
if l.mutationMap.len() == 0 && len(l.plist.Postings) == 0 {
pl, err := txn.cache.readPostingListAt(key)
if err == badger.ErrKeyNotFound {
return l, nil
}
if err != nil {
return nil, err
}
if pl.CommitTs == 0 {
l.mutationMap.setCurrentEntries(txn.StartTs, pl)
} else {
l.mutationMap.insertCommittedPostings(pl)
}
}
return l, nil
}

// Update calls UpdateDeltasAndDiscardLists on the local cache.
func (txn *Txn) Update() {
txn.cache.UpdateDeltasAndDiscardLists()
Expand Down
24 changes: 12 additions & 12 deletions worker/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,25 +90,25 @@ func runMutation(ctx context.Context, edge *pb.DirectedEdge, txn *posting.Txn) e
// The following is a performance optimization which allows us to not read a posting list from
// disk. We calculate this based on how AddMutationWithIndex works. The general idea is that if
// we're not using the read posting list, we don't need to retrieve it. We need the posting list
// if we're doing indexing or count index or enforcing single UID, etc. In other cases, we can
// just create a posting list facade in memory and use it to store the delta in Badger. Later,
// the rollup operation would consolidate all these deltas into a posting list.
// if we're doing count index or delete operation. For scalar predicates, we just get the last item merged.
// In other cases, we can just create a posting list facade in memory and use it to store the delta in Badger.
// Later, the rollup operation would consolidate all these deltas into a posting list.
isList := su.GetList()
var getFn func(key []byte) (*posting.List, error)
switch {
case len(su.GetTokenizer()) > 0 || su.GetCount():
// Any index or count index.
getFn = txn.Get
case su.GetValueType() == pb.Posting_UID && !su.GetList():
// Single UID, not a list.
case len(edge.Lang) == 0 && !isList:
// Scalar Predicates, without lang
getFn = txn.GetScalarList
case len(edge.Lang) > 0 || su.GetCount():
// Language or Count Index
getFn = txn.Get
case edge.Op == pb.DirectedEdge_DEL:
// Covers various delete cases to keep things simple.
getFn = txn.Get
default:
// Reverse index doesn't need the posting list to be read. We already covered count index,
// single uid and delete all above.
// Values, whether single or list, don't need to be read.
// Uid list doesn't need to be read.
// Only count index needs to be read. For other indexes on list, we don't need to read any data.
// For indexes on scalar prediactes, only the last element needs to be left.
// Delete cases covered above.
getFn = txn.GetFromDelta
}

Expand Down
10 changes: 10 additions & 0 deletions x/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ var (
// LatencyMs is the latency of the various Dgraph operations.
LatencyMs = ostats.Float64("latency",
"Latency of the various methods", ostats.UnitMilliseconds)
// BadgerReadLatencyMs is the latency of various different predicate reads from badger.
BadgerReadLatencyMs = ostats.Float64("badger_read_latency_ms",
"Latency of the various methods", ostats.UnitMilliseconds)

// Point-in-time metrics.

Expand Down Expand Up @@ -201,6 +204,13 @@ var (
Aggregation: defaultLatencyMsDistribution,
TagKeys: allTagKeys,
},
{
Name: BadgerReadLatencyMs.Name(),
Measure: BadgerReadLatencyMs,
Description: BadgerReadLatencyMs.Description(),
Aggregation: defaultLatencyMsDistribution,
TagKeys: allTagKeys,
},
{
Name: NumQueries.Name(),
Measure: NumQueries,
Expand Down

0 comments on commit ab96596

Please sign in to comment.