Skip to content

Commit

Permalink
Overwrite values for uid predicates (#4883)
Browse files Browse the repository at this point in the history
When the warning to delete and re-add the value was removed, non-list uid
predicates were not being overwritten. This fixes this by iterating over the list
and replacing theexisting postings with a copy with the operation set to a delete.

It also adds a test to prevent this from happening again.

Fixes #4879
  • Loading branch information
martinmr committed Mar 17, 2020
1 parent a89303e commit cc495fc
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 11 deletions.
66 changes: 55 additions & 11 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/dgraph-io/dgraph/types/facets"
"github.com/dgraph-io/dgraph/x"
"github.com/pkg/errors"
"github.com/golang/protobuf/proto"
)

var (
Expand Down Expand Up @@ -310,7 +311,7 @@ func hasDeleteAll(mpost *pb.Posting) bool {
}

// Ensure that you either abort the uncommitted postings or commit them before calling me.
func (l *List) updateMutationLayer(mpost *pb.Posting) {
func (l *List) updateMutationLayer(mpost *pb.Posting, singleUidUpdate bool) error {
l.AssertLock()
x.AssertTrue(mpost.Op == Set || mpost.Op == Del)

Expand All @@ -322,26 +323,60 @@ func (l *List) updateMutationLayer(mpost *pb.Posting) {
l.mutationMap = make(map[uint64]*pb.PostingList)
}
l.mutationMap[mpost.StartTs] = plist
return
return nil
}

plist, ok := l.mutationMap[mpost.StartTs]
if !ok {
plist := &pb.PostingList{}
plist.Postings = append(plist.Postings, mpost)
plist = &pb.PostingList{}
if l.mutationMap == nil {
l.mutationMap = make(map[uint64]*pb.PostingList)
}
l.mutationMap[mpost.StartTs] = plist
return
}

if singleUidUpdate {
// This handles the special case when adding a value to predicates of type uid.
// The current value should be deleted in favor of this value. This needs to
// be done because the fingerprint for the value is not math.MaxUint64 as is
// the case with the rest of the scalar predicates.
plist := &pb.PostingList{}
plist.Postings = append(plist.Postings, mpost)

err := l.iterate(mpost.StartTs, 0, func(obj *pb.Posting) error {
// Ignore values which have the same uid as they will get replaced
// by the current value.
if obj.Uid == mpost.Uid {
return nil
}

// Mark all other values as deleted. By the end of the iteration, the
// list of postings will contain deleted operations and only one set
// for the mutation stored in mpost.
objCopy := proto.Clone(obj).(*pb.Posting)
objCopy.Op = Del
plist.Postings = append(plist.Postings, objCopy)
return nil
})
if err != nil {
return err
}

// Update the mutation map with the new plist. Return here since the code below
// does not apply for predicates of type uid.
l.mutationMap[mpost.StartTs] = plist
return nil
}

// Even if we have a delete all in this transaction, we should still pick up any updates since.
for i, prev := range plist.Postings {
if prev.Uid == mpost.Uid {
plist.Postings[i] = mpost
return
return nil
}
}
plist.Postings = append(plist.Postings, mpost)
return nil
}

// TypeID returns the typeid of destination vertex
Expand Down Expand Up @@ -400,17 +435,26 @@ func (l *List) addMutationInternal(ctx context.Context, txn *Txn, t *pb.Directed
t.ValueId = fingerprintEdge(t)
mpost.Uid = t.ValueId
}
l.updateMutationLayer(mpost)

// We ensure that commit marks are applied to posting lists in the right
// order. We can do so by proposing them in the same order as received by the Oracle delta
// stream from Zero, instead of in goroutines.
var conflictKey uint64
// Check whether this mutation is an update for a predicate of type uid.
pk, err := x.Parse(l.key)
if err != nil {
return errors.Wrapf(err, "cannot parse key when adding mutation to list with key %s",
hex.EncodeToString(l.key))
}
pred, ok := schema.State().Get(t.Attr)
isSingleUidUpdate := ok && !pred.GetList() && pred.GetValueType() == pb.Posting_UID &&
pk.IsData() && mpost.Op == Set && mpost.PostingType == pb.Posting_REF

if err != l.updateMutationLayer(mpost, isSingleUidUpdate) {
return errors.Wrapf(err, "cannot update mutation layer of key %s with value %+v",
hex.EncodeToString(l.key), mpost)
}

// We ensure that commit marks are applied to posting lists in the right
// order. We can do so by proposing them in the same order as received by the Oracle delta
// stream from Zero, instead of in goroutines.
var conflictKey uint64
switch {
case schema.State().HasNoConflict(t.Attr):
break
Expand Down
58 changes: 58 additions & 0 deletions systest/mutations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func TestSystem(t *testing.T) {
t.Run("infer schema as list JSON", wrap(InferSchemaAsListJSON))
t.Run("force schema as list JSON", wrap(ForceSchemaAsListJSON))
t.Run("force schema as single JSON", wrap(ForceSchemaAsSingleJSON))
t.Run("overwrite uid predicates", wrap(OverwriteUidPredicates))
}

func FacetJsonInputSupportsAnyOfTerms(t *testing.T, c *dgo.Dgraph) {
Expand Down Expand Up @@ -1848,3 +1849,60 @@ func ForceSchemaAsSingleJSON(t *testing.T, c *dgo.Dgraph) {
testutil.CompareJSON(t, `{"schema": [{"predicate":"person"}, {"predicate":"nickname"}]}`,
string(resp.Json))
}

func OverwriteUidPredicates(t *testing.T, c *dgo.Dgraph) {
ctx := context.Background()
op := &api.Operation{DropAll: true}
require.NoError(t, c.Alter(ctx, op))

op = &api.Operation{
Schema: `
best_friend: uid .
name: string @index(exact) .`,
}
err := c.Alter(ctx, op)
require.NoError(t, err)
require.NoError(t, testutil.WaitForAlter(ctx, c, op.Schema))

txn := c.NewTxn()
_, err = txn.Mutate(context.Background(), &api.Mutation{
CommitNow: true,
SetNquads: []byte(`
_:alice <name> "Alice" .
_:bob <name> "Bob" .
_:alice <best_friend> _:bob .`),
})
require.NoError(t, err)

q := `{
me(func: eq(name, Alice)) {
name
best_friend {
name
}
}
}`
resp, err := c.NewReadOnlyTxn().Query(ctx, q)
require.NoError(t, err)
testutil.CompareJSON(t, `{"me":[{"name":"Alice","best_friend": {"name": "Bob"}}]}`,
string(resp.GetJson()))

upsertQuery := `query { alice as var(func: eq(name, Alice)) }`
upsertMutation := &api.Mutation{
SetNquads: []byte(`
_:carol <name> "Carol" .
uid(alice) <best_friend> _:carol .`),
}
req := &api.Request{
Query: upsertQuery,
Mutations: []*api.Mutation{upsertMutation},
CommitNow: true,
}
_, err = c.NewTxn().Do(ctx, req)
require.NoError(t, err)

resp, err = c.NewReadOnlyTxn().Query(ctx, q)
require.NoError(t, err)
testutil.CompareJSON(t, `{"me":[{"name":"Alice","best_friend": {"name": "Carol"}}]}`,
string(resp.GetJson()))
}

0 comments on commit cc495fc

Please sign in to comment.