From 6043230cdef73fd2194d2c416ff7ac0aade4d44b Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Tue, 7 Apr 2020 14:18:15 -0700 Subject: [PATCH] Optimize uid allocation in live loader. The live loader is having trouble loading exported data with the existing uids because there are too many requests for new uids. The current version requests new Uids to be allocated for every uids greater than the maximum. In the exported data, the uids can come in increasing order, which causes a new request for uids with every NQuad. This PR changes the code to pre-allocate the uids, once per batch of NQuad received from the NQuad buffer channel. Tested it with the 1 million movie data set and now I am getting times similar to the live loader with the --new_uids option enabled. Fixes #4996 --- dgraph/cmd/live/run.go | 31 ++++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/dgraph/cmd/live/run.go b/dgraph/cmd/live/run.go index 6c836d5eed8..14171988f72 100644 --- a/dgraph/cmd/live/run.go +++ b/dgraph/cmd/live/run.go @@ -205,7 +205,6 @@ func (l *loader) uid(val string) string { // later to another node. It is up to the user to avoid this. if !opt.newUids { if uid, err := strconv.ParseUint(val, 0, 64); err == nil { - l.alloc.BumpTo(uid) return fmt.Sprintf("%#x", uid) } } @@ -214,6 +213,34 @@ func (l *loader) uid(val string) string { return fmt.Sprintf("%#x", uint64(uid)) } +// allocateUids looks for the maximum uid value in the given NQuads and bumps the +// maximum seen uid to that value. +func (l *loader) allocateUids(nqs []*api.NQuad) { + if opt.newUids { + return + } + + var maxUid uint64 + for _, nq := range nqs { + sUid, err := strconv.ParseUint(nq.Subject, 0, 64) + if err != nil { + continue + } + if sUid > maxUid { + maxUid = sUid + } + + oUid, err := strconv.ParseUint(nq.ObjectId, 0, 64) + if err != nil { + continue + } + if oUid > maxUid { + maxUid = oUid + } + } + l.alloc.BumpTo(maxUid) +} + // processFile forwards a file to the RDF or JSON processor as appropriate func (l *loader) processFile(ctx context.Context, filename string) error { fmt.Printf("Processing data file %q\n", filename) @@ -281,6 +308,8 @@ func (l *loader) processLoadFile(ctx context.Context, rd *bufio.Reader, ck chunk if len(nqs) == 0 { continue } + + l.allocateUids(nqs) for _, nq := range nqs { nq.Subject = l.uid(nq.Subject) if len(nq.ObjectId) > 0 {