Skip to content

Commit

Permalink
Discard earlier versions of posting lists (dgraph-io#2859)
Browse files Browse the repository at this point in the history
* Move TxnWriter to posting pkg, so it can discard earlier versions based on Posting bits.
* Fix build breakages
  • Loading branch information
manishrjain authored and dna2github committed Jul 19, 2019
1 parent 5be74c4 commit 5491f0a
Show file tree
Hide file tree
Showing 10 changed files with 25 additions and 16 deletions.
2 changes: 1 addition & 1 deletion dgraph/cmd/bulk/count_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (c *countIndexer) addUid(rawKey []byte, count int) {
}

func (c *countIndexer) writeIndex(pred string, rev bool, counts map[int][]uint64) {
writer := x.NewTxnWriter(c.db)
writer := posting.NewTxnWriter(c.db)
writer.BlindWrite = true

for count, uids := range counts {
Expand Down
4 changes: 2 additions & 2 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func deleteEntries(prefix []byte, remove func(key []byte) bool) error {
itr := txn.NewIterator(opt)
defer itr.Close()

writer := x.NewTxnWriter(pstore)
writer := NewTxnWriter(pstore)
for itr.Rewind(); itr.Valid(); itr.Next() {
item := itr.Item()
if !remove(item.Key()) {
Expand Down Expand Up @@ -507,7 +507,7 @@ func (r *rebuild) Run(ctx context.Context) error {
}

// Now we write all the created posting lists to disk.
writer := x.NewTxnWriter(pstore)
writer := NewTxnWriter(pstore)
for key := range txn.deltas {
pl, err := txn.Get([]byte(key))
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion posting/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func addMutation(t *testing.T, l *List, edge *pb.DirectedEdge, op uint32,
require.NoError(t, err)
}

writer := x.NewTxnWriter(pstore)
writer := NewTxnWriter(pstore)
require.NoError(t, txn.CommitToDisk(writer, commitTs))
require.NoError(t, writer.Flush())
require.NoError(t, txn.CommitToMemory(commitTs))
Expand Down
2 changes: 1 addition & 1 deletion posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (t *Txn) Fill(ctx *api.TxnContext) {
// generate state. The state should only be generated by rollup, which in turn should look at the
// last Snapshot Ts, to determine how much of the PL to rollup. We only want to roll up the deltas,
// with commit ts <= snapshot ts, and not above.
func (tx *Txn) CommitToDisk(writer *x.TxnWriter, commitTs uint64) error {
func (tx *Txn) CommitToDisk(writer *TxnWriter, commitTs uint64) error {
if commitTs == 0 {
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions posting/mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestPostingListRead(t *testing.T) {
addEdgeToUID(t, "emptypl", 1, 2, 1, 2)
addEdgeToUID(t, "emptypl", 1, 3, 3, 4)

writer := x.NewTxnWriter(pstore)
writer := NewTxnWriter(pstore)
require.NoError(t, writer.SetAt(key, []byte{}, BitEmptyPosting, 6))
require.NoError(t, writer.Flush())
assertLength(7, 0)
Expand All @@ -87,7 +87,7 @@ func TestPostingListRead(t *testing.T) {
data, err := empty.Marshal()
require.NoError(t, err)

writer = x.NewTxnWriter(pstore)
writer = NewTxnWriter(pstore)
require.NoError(t, writer.SetAt(key, data, BitCompletePosting, 10))
require.NoError(t, writer.Flush())
assertLength(10, 0)
Expand Down
16 changes: 12 additions & 4 deletions x/badger.go → posting/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@
* limitations under the License.
*/

package x
package posting

import (
"math"
"sync"

"github.com/dgraph-io/badger"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/golang/glog"
)

Expand Down Expand Up @@ -99,15 +100,22 @@ func (w *TxnWriter) SetAt(key, val []byte, meta byte, ts uint64) error {
} else if item.Version() >= ts {
// Found an existing commit at an equal or higher timestamp. So, skip writing.
if glog.V(2) {
pk := Parse(key)
pk := x.Parse(key)
glog.Warningf("Existing >= Commit [%d >= %d]. Skipping write: %v",
item.Version(), ts, pk)
}
return nil
}
}
if err := txn.SetWithMeta(key, val, meta); err != nil {
return err
switch meta {
case BitCompletePosting, BitEmptyPosting:
if err := txn.SetWithDiscard(key, val, meta); err != nil {
return err
}
default:
if err := txn.SetWithMeta(key, val, meta); err != nil {
return err
}
}
w.wg.Add(1)
return txn.CommitAt(ts, w.cb)
Expand Down
2 changes: 1 addition & 1 deletion query/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func addEdge(t *testing.T, attr string, src uint64, edge *pb.DirectedEdge) {
// The following logic is based on node.commitOrAbort in worker/draft.go.
// We need to commit to disk, so secondary indices, particularly the ones
// which iterate over Badger, would work correctly.
writer := x.NewTxnWriter(ps)
writer := posting.NewTxnWriter(ps)
require.NoError(t, txn.CommitToDisk(writer, commit))
require.NoError(t, writer.Flush())

Expand Down
4 changes: 2 additions & 2 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ func (n *node) processApplyCh() {

func (n *node) commitOrAbort(pkey string, delta *pb.OracleDelta) error {
// First let's commit all mutations to disk.
writer := x.NewTxnWriter(pstore)
writer := posting.NewTxnWriter(pstore)
toDisk := func(start, commit uint64) {
txn := posting.Oracle().GetTxn(start)
if txn == nil {
Expand Down Expand Up @@ -779,7 +779,7 @@ func listWrap(kv *bpb.KV) *bpb.KVList {
// rollupLists would consolidate all the deltas that constitute one posting
// list, and write back a complete posting list.
func (n *node) rollupLists(readTs uint64) error {
writer := x.NewTxnWriter(pstore)
writer := posting.NewTxnWriter(pstore)
writer.BlindWrite = true // Do overwrite keys.

stream := pstore.NewStreamAt(readTs)
Expand Down
2 changes: 1 addition & 1 deletion worker/predicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func commitTransaction(t *testing.T, edge *pb.DirectedEdge, l *posting.List) {

commit := commitTs(startTs)

writer := x.NewTxnWriter(pstore)
writer := posting.NewTxnWriter(pstore)
require.NoError(t, txn.CommitToDisk(writer, commit))
require.NoError(t, writer.Flush())
require.NoError(t, txn.CommitToMemory(commit))
Expand Down
3 changes: 2 additions & 1 deletion worker/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/golang/glog"

"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
)
Expand Down Expand Up @@ -55,7 +56,7 @@ func (n *node) populateSnapshot(snap pb.Snapshot, pl *conn.Pool) (int, error) {

// We can use count to check the number of posting lists returned in tests.
count := 0
writer := x.NewTxnWriter(pstore)
writer := posting.NewTxnWriter(pstore)
writer.BlindWrite = true // Do overwrite keys.
for {
kvs, err := stream.Recv()
Expand Down

0 comments on commit 5491f0a

Please sign in to comment.