Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Dgraph): Add experimental cache for posting lists #6245

Merged
merged 10 commits into from
Aug 24, 2020
31 changes: 31 additions & 0 deletions posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgo/v200/protos/api"
"github.com/dgraph-io/ristretto"
"github.com/golang/glog"

"github.com/dgraph-io/dgraph/protos/pb"
Expand Down Expand Up @@ -131,13 +132,43 @@ func updateMemoryMetrics(lc *y.Closer) {
var (
pstore *badger.DB
closer *y.Closer
lCache *ristretto.Cache
)

// Init initializes the posting lists package, the in memory and dirty list hash.
func Init(ps *badger.DB) {
pstore = ps
closer = y.NewCloser(1)
go updateMemoryMetrics(closer)

// Initialize cache.
// TODO(Ibrahim): Add flag to switch cache on and off. For now cache is disabled.
if true {
return
}
// TODO(Ibrahim): Replace hard-coded value with value from flag.
var err error
lCache, err = ristretto.NewCache(&ristretto.Config{
NumCounters: 200e6,
MaxCost: int64(1000 * 1024 * 1024),
BufferItems: 64,
Metrics: true,
Cost: func(val interface{}) int64 {
l, ok := val.(*List)
if !ok {
return int64(0)
}
return int64(l.DeepSize())
},
})
x.Check(err)
go func() {
m := lCache.Metrics
ticker := time.NewTicker(5 * time.Minute)
for range ticker.C {
glog.V(2).Infof("Posting list cache metrics: %s", m)
}
}()
}

// Cleanup waits until the closer has finished processing.
Expand Down
58 changes: 57 additions & 1 deletion posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
"github.com/golang/glog"
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -74,6 +75,9 @@ func (ir *incrRollupi) rollUpKey(writer *TxnWriter, key []byte) error {
if err != nil {
return err
}
// Clear the list from the cache after a rollup.
RemoveCacheFor(key)

const N = uint64(1000)
if glog.V(2) {
if count := atomic.AddUint64(&ir.count, 1); count%N == 0 {
Expand Down Expand Up @@ -254,6 +258,32 @@ func (txn *Txn) CommitToDisk(writer *TxnWriter, commitTs uint64) error {
return nil
}

// ResetCache will clear all the cached list.
func ResetCache() {
lCache.Clear()
}

// RemoveCacheFor will delete the list corresponding to the given key.
func RemoveCacheFor(key []byte) {
// TODO: investigate if this can be done by calling Set with a nil value.
lCache.Del(key)
}

// RemoveCachedKeys will delete the cached list by this txn.
func (txn *Txn) RemoveCachedKeys() {
if txn == nil || txn.cache == nil {
return
}
for key := range txn.cache.deltas {
lCache.Del(key)
}
}

func WaitForCache() {
// TODO Investigate if this is needed and why Jepsen tests fail with the cache enabled.
// lCache.Wait()
}

func unmarshalOrCopy(plist *pb.PostingList, item *badger.Item) error {
if plist == nil {
return errors.Errorf("cannot unmarshal value to a nil posting list of key %s",
Expand Down Expand Up @@ -366,6 +396,27 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
}

func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) {
cachedVal, ok := lCache.Get(key)
if ok {
l, ok := cachedVal.(*List)
if ok && l != nil {
// No need to clone the immutable layer or the key since mutations will not modify it.
lCopy := &List{
minTs: l.minTs,
maxTs: l.maxTs,
key: key,
plist: l.plist,
}
if l.mutationMap != nil {
lCopy.mutationMap = make(map[uint64]*pb.PostingList, len(l.mutationMap))
for ts, pl := range l.mutationMap {
lCopy.mutationMap[ts] = proto.Clone(pl).(*pb.PostingList)
}
}
return lCopy, nil
}
}

txn := pstore.NewTransactionAt(readTs, false)
defer txn.Discard()

Expand All @@ -377,5 +428,10 @@ func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) {
itr := txn.NewKeyIterator(key, iterOpts)
defer itr.Close()
itr.Seek(key)
return ReadPostingList(key, itr)
l, err := ReadPostingList(key, itr)
if err != nil {
return l, err
}
lCache.Set(key, l, 0)
return l, nil
}
24 changes: 23 additions & 1 deletion worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,13 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr
if proposal.Mutations.DropOp == pb.Mutations_DATA {
// Ensures nothing get written to disk due to commit proposals.
posting.Oracle().ResetTxns()
return posting.DeleteData()
if err := posting.DeleteData(); err != nil {
return err
}

// Clear entire cache.
posting.ResetCache()
return nil
}

if proposal.Mutations.DropOp == pb.Mutations_ALL {
Expand All @@ -316,6 +322,9 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr
return err
}

// Clear entire cache.
posting.ResetCache()

if groups().groupId() == 1 {
initialSchema := schema.InitialSchema()
for _, s := range initialSchema {
Expand Down Expand Up @@ -380,6 +389,12 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr
return err
}

// Clear the entire cache if there is a schema update because the index rebuild
// will invalidate the state.
if len(proposal.Mutations.Schema) > 0 {
posting.ResetCache()
}

for _, tupdate := range proposal.Mutations.Types {
if err := runTypeMutation(ctx, tupdate); err != nil {
return err
Expand Down Expand Up @@ -768,6 +783,13 @@ func (n *node) commitOrAbort(pkey string, delta *pb.OracleDelta) error {
atomic.StoreUint64(&g.deltaChecksum, delta.GroupChecksums[g.groupId()])
}

// Clear all the cached lists that were touched by this transaction.
for _, status := range delta.Txns {
txn := posting.Oracle().GetTxn(status.StartTs)
txn.RemoveCachedKeys()
}
posting.WaitForCache()

// Now advance Oracle(), so we can service waiting reads.
posting.Oracle().ProcessDelta(delta)
return nil
Expand Down
4 changes: 2 additions & 2 deletions x/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ var (
// MaxAssignedTs records the latest max assigned timestamp.
MaxAssignedTs = stats.Int64("max_assigned_ts",
"Latest max assigned timestamp", stats.UnitDimensionless)
TxnAborts = stats.Int64("txn_aborts",
"Number of transaction aborts",stats.UnitDimensionless)
TxnAborts = stats.Int64("txn_aborts",
"Number of transaction aborts", stats.UnitDimensionless)

// Conf holds the metrics config.
// TODO: Request statistics, latencies, 500, timeouts
Expand Down