Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Infer type of schema from JSON and RDF mutations. #4328

Merged
merged 30 commits into from
Dec 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
8ce14b3
Infer schema as list if necessary.
martinmr Nov 22, 2019
b7f7fdd
Initial commit .
martinmr Nov 27, 2019
4659927
Regenerate proto.
martinmr Nov 27, 2019
cfcf70b
Merge remote-tracking branch 'origin/master' into martinmr/infer-sche…
martinmr Nov 27, 2019
24ab070
Merge branch 'martinmr/infer-schema-as-list' into martinmr/json-list-…
martinmr Nov 27, 2019
27edb74
First working version.
martinmr Nov 27, 2019
fce42ad
Remove logs and change function signature.
martinmr Nov 27, 2019
91c8f5f
Merge branch 'martinmr/infer-schema-as-list' into martinmr/json-list-…
martinmr Nov 27, 2019
8f80ee7
new test.
martinmr Nov 27, 2019
7c72618
Fix bug
martinmr Nov 27, 2019
00c80a2
Add test for forcing uid preds to be single.
martinmr Nov 28, 2019
cda686a
Refactor to use single ParseMetadata object.
martinmr Dec 2, 2019
1b1d788
Remove ParseMetadata struct and fix bug.
martinmr Dec 3, 2019
ce263bd
Merge remote-tracking branch 'origin/master' into martinmr/infer-sche…
martinmr Dec 3, 2019
877f100
Merge branch 'martinmr/infer-schema-as-list' into martinmr/json-list-…
martinmr Dec 3, 2019
1cee098
Fix failing test.
martinmr Dec 3, 2019
1c25bce
Add consistency check.
martinmr Dec 3, 2019
9d53127
Merge remote-tracking branch 'origin/master' into martinmr/json-list-…
martinmr Dec 3, 2019
3dbcbbc
Refactor and move RDF logic to chunker package.
martinmr Dec 3, 2019
1aa6a12
Fix tests.
martinmr Dec 3, 2019
7e32efb
Fix shadowed import warnings.
martinmr Dec 9, 2019
1e92de0
Merge remote-tracking branch 'origin/master' into martinmr/json-list-…
martinmr Dec 9, 2019
79f38bd
Do not return an error when the hints don't match.
martinmr Dec 12, 2019
cf3b316
Merge remote-tracking branch 'origin/master' into martinmr/json-list-…
martinmr Dec 13, 2019
2466d10
Merge remote-tracking branch 'origin/master' into martinmr/json-list-…
martinmr Dec 18, 2019
2a2d428
Fix tests.
martinmr Dec 18, 2019
e797152
Revert "Fix tests."
martinmr Dec 18, 2019
a84e30c
proto changes.
martinmr Dec 19, 2019
0c3820f
Change field names to new protobuf.
martinmr Dec 19, 2019
aa97c99
Merge remote-tracking branch 'origin/master' into martinmr/json-list-…
martinmr Dec 19, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions chunker/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,7 @@ func (jc *jsonChunker) Parse(chunkBuf *bytes.Buffer) error {
return nil
}

err := jc.nqs.ParseJSON(chunkBuf.Bytes(), SetNquads)
return err
return jc.nqs.ParseJSON(chunkBuf.Bytes(), SetNquads)
}

func slurpSpace(r *bufio.Reader) error {
Expand Down
35 changes: 31 additions & 4 deletions chunker/json_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"unicode"

"github.com/dgraph-io/dgo/v2/protos/api"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/types"
"github.com/dgraph-io/dgraph/types/facets"
"github.com/dgraph-io/dgraph/x"
Expand Down Expand Up @@ -233,6 +234,7 @@ type NQuadBuffer struct {
batchSize int
nquads []*api.NQuad
nqCh chan []*api.NQuad
predHints map[string]pb.Metadata_HintType
}

// NewNQuadBuffer returns a new NQuadBuffer instance with the specified batch size.
Expand All @@ -244,6 +246,7 @@ func NewNQuadBuffer(batchSize int) *NQuadBuffer {
if buf.batchSize > 0 {
buf.nquads = make([]*api.NQuad, 0, batchSize)
}
buf.predHints = make(map[string]pb.Metadata_HintType)
return buf
}

Expand All @@ -263,6 +266,23 @@ func (buf *NQuadBuffer) Push(nqs ...*api.NQuad) {
}
}

// Metadata returns the parse metadata that has been aggregated so far..
func (buf *NQuadBuffer) Metadata() *pb.Metadata {
return &pb.Metadata{
PredHints: buf.predHints,
}
}

// PushPredHint pushes and aggregates hints about the type of the predicate derived
// during the parsing. This metadata is expected to be a lot smaller than the set of
// NQuads so it's not necessary to send them in batches.
func (buf *NQuadBuffer) PushPredHint(pred string, hint pb.Metadata_HintType) {
if oldHint, ok := buf.predHints[pred]; ok && hint != oldHint {
hint = pb.Metadata_LIST
}
buf.predHints[pred] = hint
}

// Flush must be called at the end to push out all the buffered NQuads to the channel. Once Flush is
// called, this instance of NQuadBuffer should no longer be used.
func (buf *NQuadBuffer) Flush() {
Expand Down Expand Up @@ -387,6 +407,7 @@ func (buf *NQuadBuffer) mapToNquads(m map[string]interface{}, op int, parentPred
return mr, err
}
buf.Push(&nq)
buf.PushPredHint(pred, pb.Metadata_SINGLE)
case map[string]interface{}:
if len(v) == 0 {
continue
Expand All @@ -398,6 +419,7 @@ func (buf *NQuadBuffer) mapToNquads(m map[string]interface{}, op int, parentPred
}
if ok {
buf.Push(&nq)
buf.PushPredHint(pred, pb.Metadata_SINGLE)
continue
}

Expand All @@ -410,7 +432,9 @@ func (buf *NQuadBuffer) mapToNquads(m map[string]interface{}, op int, parentPred
nq.ObjectId = cr.uid
nq.Facets = cr.fcts
buf.Push(&nq)
buf.PushPredHint(pred, pb.Metadata_SINGLE)
case []interface{}:
buf.PushPredHint(pred, pb.Metadata_LIST)
for _, item := range v {
nq := api.NQuad{
Subject: mr.uid,
Expand Down Expand Up @@ -509,12 +533,15 @@ func (buf *NQuadBuffer) ParseJSON(b []byte, op int) error {

// ParseJSON is a convenience wrapper function to get all NQuads in one call. This can however, lead
// to high memory usage. So be careful using this.
func ParseJSON(b []byte, op int) ([]*api.NQuad, error) {
func ParseJSON(b []byte, op int) ([]*api.NQuad, *pb.Metadata, error) {
buf := NewNQuadBuffer(-1)
if err := buf.ParseJSON(b, op); err != nil {
return nil, err
err := buf.ParseJSON(b, op)
if err != nil {
return nil, nil, err
}

buf.Flush()
nqs := <-buf.Ch()
return nqs, nil
metadata := buf.Metadata()
return nqs, metadata, nil
}
30 changes: 27 additions & 3 deletions chunker/rdf_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/dgraph-io/dgo/v2/protos/api"
"github.com/dgraph-io/dgraph/lex"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/types"
"github.com/dgraph-io/dgraph/types/facets"
"github.com/dgraph-io/dgraph/x"
Expand Down Expand Up @@ -54,7 +55,7 @@ func sane(s string) bool {

// ParseRDFs is a convenience wrapper function to get all NQuads in one call. This can however, lead
// to high memory usage. So, be careful using this.
func ParseRDFs(b []byte) ([]*api.NQuad, error) {
func ParseRDFs(b []byte) ([]*api.NQuad, *pb.Metadata, error) {
var nqs []*api.NQuad
var l lex.Lexer
for _, line := range bytes.Split(b, []byte{'\n'}) {
Expand All @@ -63,11 +64,12 @@ func ParseRDFs(b []byte) ([]*api.NQuad, error) {
continue
}
if err != nil {
return nil, err
return nil, nil, err
}
nqs = append(nqs, &nq)
}
return nqs, nil

return nqs, calculateTypeHints(nqs), nil
}

// ParseRDF parses a mutation string and returns the N-Quad representation for it.
Expand Down Expand Up @@ -320,6 +322,28 @@ func parseFacetsRDF(it *lex.ItemIterator, rnq *api.NQuad) error {
return nil
}

// subjectPred is a type to store the count for each <subject, pred> in the mutations.
type subjectPred struct {
subject string
pred string
}

func calculateTypeHints(nqs []*api.NQuad) *pb.Metadata {
// Stores the count of <subject, pred> pairs to help figure out whether
// schemas should be created as scalars or lists of scalars.
schemaCountMap := make(map[subjectPred]int)
predHints := make(map[string]pb.Metadata_HintType)

for _, nq := range nqs {
subPredPair := subjectPred{subject: nq.Subject, pred: nq.Predicate}
schemaCountMap[subPredPair]++
if count := schemaCountMap[subPredPair]; count > 1 {
predHints[nq.Predicate] = pb.Metadata_LIST
}
}
return &pb.Metadata{PredHints: predHints}
}

var typeMap = map[string]types.TypeID{
"xs:password": types.PasswordID,
"xs:string": types.StringID,
Expand Down
29 changes: 24 additions & 5 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,23 @@ func (s *Server) doMutate(ctx context.Context, qc *queryContext, resp *api.Respo
return err
}

m := &pb.Mutations{Edges: edges, StartTs: qc.req.StartTs}
predHints := make(map[string]pb.Metadata_HintType)
for _, gmu := range qc.gmuList {
for pred, hint := range gmu.Metadata.GetPredHints() {
if oldHint := predHints[pred]; oldHint == pb.Metadata_LIST {
continue
}
predHints[pred] = hint
}
}
m := &pb.Mutations{
Edges: edges,
StartTs: qc.req.StartTs,
Metadata: &pb.Metadata{
PredHints: predHints,
},
}

qc.span.Annotatef(nil, "Applying mutations: %+v", m)
resp.Txn, err = query.ApplyMutations(ctx, m)
qc.span.Annotatef(nil, "Txn Context: %+v. Err=%v", resp.Txn, err)
Expand Down Expand Up @@ -957,28 +973,31 @@ func parseMutationObject(mu *api.Mutation) (*gql.Mutation, error) {
res := &gql.Mutation{Cond: mu.Cond}

if len(mu.SetJson) > 0 {
nqs, err := chunker.ParseJSON(mu.SetJson, chunker.SetNquads)
nqs, md, err := chunker.ParseJSON(mu.SetJson, chunker.SetNquads)
if err != nil {
return nil, err
}
res.Set = append(res.Set, nqs...)
res.Metadata = md
}
if len(mu.DeleteJson) > 0 {
nqs, err := chunker.ParseJSON(mu.DeleteJson, chunker.DeleteNquads)
// The metadata is not currently needed for delete operations so it can be safely ignored.
nqs, _, err := chunker.ParseJSON(mu.DeleteJson, chunker.DeleteNquads)
if err != nil {
return nil, err
}
res.Del = append(res.Del, nqs...)
}
if len(mu.SetNquads) > 0 {
nqs, err := chunker.ParseRDFs(mu.SetNquads)
nqs, md, err := chunker.ParseRDFs(mu.SetNquads)
if err != nil {
return nil, err
}
res.Set = append(res.Set, nqs...)
res.Metadata = md
}
if len(mu.DelNquads) > 0 {
nqs, err := chunker.ParseRDFs(mu.DelNquads)
nqs, _, err := chunker.ParseRDFs(mu.DelNquads)
if err != nil {
return nil, err
}
Expand Down
10 changes: 5 additions & 5 deletions edgraph/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestParseNQuads(t *testing.T) {
# this line is a comment
_:a <join> _:b .
`
nqs, err := chunker.ParseRDFs([]byte(nquads))
nqs, _, err := chunker.ParseRDFs([]byte(nquads))
require.NoError(t, err)
require.Equal(t, []*api.NQuad{
makeNquad("_:a", "predA", &api.Value{Val: &api.Value_DefaultVal{DefaultVal: "A"}}),
Expand All @@ -59,13 +59,13 @@ func TestParseNQuads(t *testing.T) {

func TestValNquads(t *testing.T) {
nquads := `uid(m) <name> val(f) .`
_, err := chunker.ParseRDFs([]byte(nquads))
_, _, err := chunker.ParseRDFs([]byte(nquads))
require.NoError(t, err)
}

func TestParseNQuadsWindowsNewline(t *testing.T) {
nquads := "_:a <predA> \"A\" .\r\n_:b <predB> \"B\" ."
nqs, err := chunker.ParseRDFs([]byte(nquads))
nqs, _, err := chunker.ParseRDFs([]byte(nquads))
require.NoError(t, err)
require.Equal(t, []*api.NQuad{
makeNquad("_:a", "predA", &api.Value{Val: &api.Value_DefaultVal{DefaultVal: "A"}}),
Expand All @@ -75,7 +75,7 @@ func TestParseNQuadsWindowsNewline(t *testing.T) {

func TestParseNQuadsDelete(t *testing.T) {
nquads := `_:a * * .`
nqs, err := chunker.ParseRDFs([]byte(nquads))
nqs, _, err := chunker.ParseRDFs([]byte(nquads))
require.NoError(t, err)
require.Equal(t, []*api.NQuad{
makeNquad("_:a", x.Star, &api.Value{Val: &api.Value_DefaultVal{DefaultVal: x.Star}}),
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestValidateKeys(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
nq, err := chunker.ParseRDFs([]byte(tc.nquad))
nq, _, err := chunker.ParseRDFs([]byte(tc.nquad))
require.NoError(t, err)

err = validateKeys(nq[0])
Expand Down
2 changes: 2 additions & 0 deletions gql/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type Mutation struct {
Cond string
Set []*api.NQuad
Del []*api.NQuad

Metadata *pb.Metadata
}

// ParseUid parses the given string into an UID. This method returns with an error
Expand Down
18 changes: 18 additions & 0 deletions protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,24 @@ message Mutations {
}
DropOp drop_op = 7;
string drop_value = 8;

Metadata metadata = 9;
}

message Metadata {
// HintType represents a hint that will be passed along the mutation and used
// to add the predicate to the schema if it's not already there.
enum HintType {
// DEFAULT means no hint is provided and Dgraph will follow the default behavior.
DEFAULT = 0;
// SINGLE signals that the predicate should be created as a single type (e.g string, uid).
SINGLE = 1;
// LIST signals that the predicate should be created as a list (e.g [string], [uid]).
LIST = 2;
}

// Map of predicates to their hints.
map<string, HintType> pred_hints = 1;
}

message Snapshot {
Expand Down
Loading