Skip to content

Commit

Permalink
fix(core): fix performance impact in mutation due to vector
Browse files Browse the repository at this point in the history
  • Loading branch information
harshil-goel committed Oct 9, 2024
1 parent 6845dab commit 3c4b2cf
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 66 deletions.
135 changes: 69 additions & 66 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,82 +110,85 @@ func (txn *Txn) addIndexMutations(ctx context.Context, info *indexMutationInfo)
return []*pb.DirectedEdge{}, errors.New("invalid UID with value 0")
}

inKey := x.DataKey(info.edge.Attr, uid)
pl, err := txn.Get(inKey)
if err != nil {
return []*pb.DirectedEdge{}, err
}
data, err := pl.AllValues(txn.StartTs)
if err != nil {
return []*pb.DirectedEdge{}, err
}

if info.op == pb.DirectedEdge_DEL &&
len(data) > 0 && data[0].Tid == types.VFloatID {
// TODO look into better alternatives
// The issue here is that we will create dead nodes in the Vector Index
// assuming an HNSW index type. What we should do instead is invoke
// index.Remove(<key to dead index value>). However, we currently do
// not support this in VectorIndex code!!
// if a delete & dealing with vfloats, add this to dead node in persistent store.
// What we should do instead is invoke the factory.Remove(key) operation.
deadAttr := hnsw.ConcatStrings(info.edge.Attr, hnsw.VecDead)
deadKey := x.DataKey(deadAttr, 1)
pl, err := txn.Get(deadKey)
if len(info.factorySpecs) > 0 {
inKey := x.DataKey(info.edge.Attr, uid)
pl, err := txn.Get(inKey)
if err != nil {
return []*pb.DirectedEdge{}, err
}
data, err := pl.AllValues(txn.StartTs)
if err != nil {
return []*pb.DirectedEdge{}, err
}
var deadNodes []uint64
deadData, _ := pl.Value(txn.StartTs)
if deadData.Value == nil {
deadNodes = append(deadNodes, uid)
} else {
deadNodes, err = hnsw.ParseEdges(string(deadData.Value.([]byte)))

if info.op == pb.DirectedEdge_DEL &&
len(data) > 0 && data[0].Tid == types.VFloatID {
// TODO look into better alternatives
// The issue here is that we will create dead nodes in the Vector Index
// assuming an HNSW index type. What we should do instead is invoke
// index.Remove(<key to dead index value>). However, we currently do
// not support this in VectorIndex code!!
// if a delete & dealing with vfloats, add this to dead node in persistent store.
// What we should do instead is invoke the factory.Remove(key) operation.
deadAttr := hnsw.ConcatStrings(info.edge.Attr, hnsw.VecDead)
deadKey := x.DataKey(deadAttr, 1)
pl, err := txn.Get(deadKey)
if err != nil {
return []*pb.DirectedEdge{}, err
}
deadNodes = append(deadNodes, uid)
}
deadNodesBytes, marshalErr := json.Marshal(deadNodes)
if marshalErr != nil {
return []*pb.DirectedEdge{}, marshalErr
}
edge := &pb.DirectedEdge{
Entity: 1,
Attr: deadAttr,
Value: deadNodesBytes,
ValueType: pb.Posting_ValType(0),
}
if err := pl.addMutation(ctx, txn, edge); err != nil {
return nil, err
var deadNodes []uint64
deadData, _ := pl.Value(txn.StartTs)
if deadData.Value == nil {
deadNodes = append(deadNodes, uid)
} else {
deadNodes, err = hnsw.ParseEdges(string(deadData.Value.([]byte)))
if err != nil {
return []*pb.DirectedEdge{}, err
}
deadNodes = append(deadNodes, uid)
}
deadNodesBytes, marshalErr := json.Marshal(deadNodes)
if marshalErr != nil {
return []*pb.DirectedEdge{}, marshalErr
}
edge := &pb.DirectedEdge{
Entity: 1,
Attr: deadAttr,
Value: deadNodesBytes,
ValueType: pb.Posting_ValType(0),
}
if err := pl.addMutation(ctx, txn, edge); err != nil {
return nil, err
}
}
}

// TODO: As stated earlier, we need to validate that it is okay to assume
// that we care about just data[0].
// Similarly, the current assumption is that we have at most one
// Vector Index, but this assumption may break later.
if info.op == pb.DirectedEdge_SET &&
len(data) > 0 && data[0].Tid == types.VFloatID &&
len(info.factorySpecs) > 0 {
// retrieve vector from inUuid save as inVec
inVec := types.BytesAsFloatArray(data[0].Value.([]byte))
tc := hnsw.NewTxnCache(NewViTxn(txn), txn.StartTs)
indexer, err := info.factorySpecs[0].CreateIndex(attr)
if err != nil {
return []*pb.DirectedEdge{}, err
}
edges, err := indexer.Insert(ctx, tc, uid, inVec)
if err != nil {
return []*pb.DirectedEdge{}, err
}
pbEdges := []*pb.DirectedEdge{}
for _, e := range edges {
pbe := indexEdgeToPbEdge(e)
pbEdges = append(pbEdges, pbe)
// TODO: As stated earlier, we need to validate that it is okay to assume
// that we care about just data[0].
// Similarly, the current assumption is that we have at most one
// Vector Index, but this assumption may break later.
if info.op == pb.DirectedEdge_SET &&
len(data) > 0 && data[0].Tid == types.VFloatID &&
len(info.factorySpecs) > 0 {
// retrieve vector from inUuid save as inVec
inVec := types.BytesAsFloatArray(data[0].Value.([]byte))
tc := hnsw.NewTxnCache(NewViTxn(txn), txn.StartTs)
indexer, err := info.factorySpecs[0].CreateIndex(attr)
if err != nil {
return []*pb.DirectedEdge{}, err
}
edges, err := indexer.Insert(ctx, tc, uid, inVec)
if err != nil {
return []*pb.DirectedEdge{}, err
}
pbEdges := []*pb.DirectedEdge{}
for _, e := range edges {
pbe := indexEdgeToPbEdge(e)
pbEdges = append(pbEdges, pbe)
}
return pbEdges, nil
}
return pbEdges, nil
}

tokens, err := indexTokens(ctx, info)
if err != nil {
// This data is not indexable
Expand Down
56 changes: 56 additions & 0 deletions worker/sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,67 @@
package worker

import (
"context"
"fmt"
"os"
"testing"

"github.com/dgraph-io/badger/v4"
"github.com/dgraph-io/dgraph/v24/posting"
"github.com/dgraph-io/dgraph/v24/protos/pb"
"github.com/dgraph-io/dgraph/v24/schema"
"github.com/dgraph-io/dgraph/v24/x"
"github.com/stretchr/testify/require"
)

func BenchmarkAddMutationWithIndex(b *testing.B) {
gr = new(groupi)
gr.gid = 1
gr.tablets = make(map[string]*pb.Tablet)
addTablets := func(attrs []string, gid uint32, namespace uint64) {
for _, attr := range attrs {
gr.tablets[x.NamespaceAttr(namespace, attr)] = &pb.Tablet{GroupId: gid}
}
}

addTablets([]string{"name", "name2", "age", "http://www.w3.org/2000/01/rdf-schema#range", "",
"friend", "dgraph.type", "dgraph.graphql.xid", "dgraph.graphql.schema"},
1, x.GalaxyNamespace)
addTablets([]string{"friend_not_served"}, 2, x.GalaxyNamespace)
addTablets([]string{"name"}, 1, 0x2)

dir, err := os.MkdirTemp("", "storetest_")
x.Check(err)
defer os.RemoveAll(dir)

opt := badger.DefaultOptions(dir)
ps, err := badger.OpenManaged(opt)
x.Check(err)
pstore = ps
// Not using posting list cache
posting.Init(ps, 0)
Init(ps)
err = schema.ParseBytes([]byte("benchmarkadd: string @index(term) ."), 1)
fmt.Println(err)
if err != nil {
panic(err)
}
ctx := context.Background()
txn := posting.Oracle().RegisterStartTs(5)
attr := x.GalaxyAttr("benchmarkadd")

for i := 0; i < b.N; i++ {
edge := &pb.DirectedEdge{
Value: []byte("david"),
Attr: attr,
Entity: 1,
Op: pb.DirectedEdge_SET,
}

x.Check(runMutation(ctx, edge, txn))
}
}

func TestRemoveDuplicates(t *testing.T) {
toSet := func(uids []uint64) map[uint64]struct{} {
m := make(map[uint64]struct{})
Expand Down

0 comments on commit 3c4b2cf

Please sign in to comment.