Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Dgraph): Add experimental cache for posting lists #6245

Merged
merged 10 commits into from
Aug 24, 2020
6 changes: 6 additions & 0 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,11 @@ they form a Raft group and provide synchronous replication.
flag.Bool("ludicrous_mode", false, "Run alpha in ludicrous mode")
flag.Bool("graphql_extensions", true, "Set to false if extensions not required in GraphQL response body")
flag.Duration("graphql_poll_interval", time.Second, "polling interval for graphql subscription.")

// Flag to enable posting list Ristretto cache.
flag.Int("pl_cachemb", 0, "EXPERIMENTAL. Size of the posting list cache in MBs. "+
"Cache will be disabled if this value is zero."+
"Right now this cache is not guaranteed to be consistent, hence it's an experimental flag.")
}

func setupCustomTokenizers() {
Expand Down Expand Up @@ -637,6 +642,7 @@ func run() {
AbortOlderThan: abortDur,
StartTime: startTime,
LudicrousMode: Alpha.Conf.GetBool("ludicrous_mode"),
PlCacheMb: uint32(Alpha.Conf.GetInt("pl_cachemb")),
}
if x.WorkerConfig.EncryptionKey, err = enc.ReadKey(Alpha.Conf); err != nil {
glog.Infof("unable to read key %v", err)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/dgraph-io/badger/v2 v2.2007.1
github.com/dgraph-io/dgo/v200 v200.0.0-20200805103119-a3544c464dd6
github.com/dgraph-io/graphql-transport-ws v0.0.0-20200715131837-c0460019ead2
github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de
github.com/dgraph-io/ristretto v0.0.4-0.20200819184049-5d96a860defa
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ github.com/dgraph-io/graphql-transport-ws v0.0.0-20200715131837-c0460019ead2 h1:
github.com/dgraph-io/graphql-transport-ws v0.0.0-20200715131837-c0460019ead2/go.mod h1:7z3c/5w0sMYYZF5bHsrh8IH4fKwG5O5Y70cPH1ZLLRQ=
github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de h1:t0UHb5vdojIDUqktM6+xJAfScFBsVpXZmqC9dsgJmeA=
github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E=
github.com/dgraph-io/ristretto v0.0.4-0.20200819184049-5d96a860defa h1:kcB79KfVlRDPLuXxmCaHoPCiEk8GUUU9QeokLYqozSA=
github.com/dgraph-io/ristretto v0.0.4-0.20200819184049-5d96a860defa/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1 h1:CaO/zOnF8VvUfEbhRatPcwKVWamvbYd8tQGRWacE9kU=
Expand Down
29 changes: 29 additions & 0 deletions posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgo/v200/protos/api"
"github.com/dgraph-io/ristretto"
"github.com/golang/glog"

"github.com/dgraph-io/dgraph/protos/pb"
Expand Down Expand Up @@ -131,13 +132,41 @@ func updateMemoryMetrics(lc *y.Closer) {
var (
pstore *badger.DB
closer *y.Closer
lCache *ristretto.Cache
)

// Init initializes the posting lists package, the in memory and dirty list hash.
func Init(ps *badger.DB) {
pstore = ps
closer = y.NewCloser(1)
go updateMemoryMetrics(closer)

// Initialize cache.
if x.WorkerConfig.PlCacheMb == 0 {
return
}
var err error
lCache, err = ristretto.NewCache(&ristretto.Config{
NumCounters: 200e6,
MaxCost: int64(x.WorkerConfig.PlCacheMb * 1024 * 1024),
BufferItems: 64,
Metrics: true,
Cost: func(val interface{}) int64 {
l, ok := val.(*List)
if !ok {
return int64(0)
}
return int64(l.DeepSize())
},
})
x.Check(err)
go func() {
m := lCache.Metrics
ticker := time.NewTicker(60 * time.Second)
for range ticker.C {
glog.Infof("Posting list cache metrics: %s", m)
}
}()
}

// Cleanup waits until the closer has finished processing.
Expand Down
56 changes: 55 additions & 1 deletion posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
"github.com/golang/glog"
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -74,6 +75,9 @@ func (ir *incrRollupi) rollUpKey(writer *TxnWriter, key []byte) error {
if err != nil {
return err
}
// Clear the list from the cache after a rollup.
RemoveCacheFor(key)

const N = uint64(1000)
if glog.V(2) {
if count := atomic.AddUint64(&ir.count, 1); count%N == 0 {
Expand Down Expand Up @@ -254,6 +258,30 @@ func (txn *Txn) CommitToDisk(writer *TxnWriter, commitTs uint64) error {
return nil
}

// ResetCache will clear all the cached list.
func ResetCache() {
lCache.Clear()
}

// RemoveCacheFor will delete the list corresponding to the given key.
func RemoveCacheFor(key []byte) {
lCache.Del(key)
}

// RemoveCachedKeys will delete the cached list by this txn.
func (txn *Txn) RemoveCachedKeys() {
if txn == nil || txn.cache == nil {
return
}
for key := range txn.cache.deltas {
lCache.Del(key)
}
}

func WaitForCache() {
lCache.Wait()
}

func unmarshalOrCopy(plist *pb.PostingList, item *badger.Item) error {
if plist == nil {
return errors.Errorf("cannot unmarshal value to a nil posting list of key %s",
Expand Down Expand Up @@ -366,6 +394,27 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
}

func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) {
cachedVal, ok := lCache.Get(key)
if ok {
l, ok := cachedVal.(*List)
if ok && l != nil {
// No need to clone the immutable layer or the key since mutations will not modify it.
lCopy := &List{
minTs: l.minTs,
maxTs: l.maxTs,
key: key,
plist: l.plist,
}
if l.mutationMap != nil {
lCopy.mutationMap = make(map[uint64]*pb.PostingList, len(l.mutationMap))
for ts, pl := range l.mutationMap {
lCopy.mutationMap[ts] = proto.Clone(pl).(*pb.PostingList)
}
}
return lCopy, nil
}
}

txn := pstore.NewTransactionAt(readTs, false)
defer txn.Discard()

Expand All @@ -377,5 +426,10 @@ func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) {
itr := txn.NewKeyIterator(key, iterOpts)
defer itr.Close()
itr.Seek(key)
return ReadPostingList(key, itr)
l, err := ReadPostingList(key, itr)
if err != nil {
return l, err
}
lCache.Set(key, l, 0)
return l, nil
}
24 changes: 23 additions & 1 deletion worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,13 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr
if proposal.Mutations.DropOp == pb.Mutations_DATA {
// Ensures nothing get written to disk due to commit proposals.
posting.Oracle().ResetTxns()
return posting.DeleteData()
if err := posting.DeleteData(); err != nil {
return err
}

// Clear entire cache.
posting.ResetCache()
return nil
}

if proposal.Mutations.DropOp == pb.Mutations_ALL {
Expand All @@ -316,6 +322,9 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr
return err
}

// Clear entire cache.
posting.ResetCache()

if groups().groupId() == 1 {
initialSchema := schema.InitialSchema()
for _, s := range initialSchema {
Expand Down Expand Up @@ -380,6 +389,12 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr
return err
}

// Clear the entire cache if there is a schema update because the index rebuild
// will invalidate the state.
if len(proposal.Mutations.Schema) > 0 {
posting.ResetCache()
}

for _, tupdate := range proposal.Mutations.Types {
if err := runTypeMutation(ctx, tupdate); err != nil {
return err
Expand Down Expand Up @@ -768,6 +783,13 @@ func (n *node) commitOrAbort(pkey string, delta *pb.OracleDelta) error {
atomic.StoreUint64(&g.deltaChecksum, delta.GroupChecksums[g.groupId()])
}

// Clear all the cached lists that were touched by this transaction.
for _, status := range delta.Txns {
txn := posting.Oracle().GetTxn(status.StartTs)
txn.RemoveCachedKeys()
}
posting.WaitForCache()

// Now advance Oracle(), so we can service waiting reads.
posting.Oracle().ProcessDelta(delta)
return nil
Expand Down
3 changes: 3 additions & 0 deletions x/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ type WorkerOptions struct {
// queries hence it has been kept as int32. LogRequest value 1 enables logging of requests
// coming to alphas and 0 disables it.
LogRequest int32
// PlCacheMb indicates the size of the posting lists cache. A value of zero indicates the cache
// is disabled
PlCacheMb uint32
}

// WorkerConfig stores the global instance of the worker package's options.
Expand Down
4 changes: 2 additions & 2 deletions x/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ var (
// MaxAssignedTs records the latest max assigned timestamp.
MaxAssignedTs = stats.Int64("max_assigned_ts",
"Latest max assigned timestamp", stats.UnitDimensionless)
TxnAborts = stats.Int64("txn_aborts",
"Number of transaction aborts",stats.UnitDimensionless)
TxnAborts = stats.Int64("txn_aborts",
"Number of transaction aborts", stats.UnitDimensionless)

// Conf holds the metrics config.
// TODO: Request statistics, latencies, 500, timeouts
Expand Down