Skip to content

Commit

Permalink
Validate and converting facets to binary (#2797)
Browse files Browse the repository at this point in the history
- Removed the TryValFor function
- Simplified TypeIDFor and ValFor
- Converting facets to binary format
  • Loading branch information
Lucas Wang authored Dec 5, 2018
1 parent b488fc2 commit a25d2b7
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 80 deletions.
6 changes: 5 additions & 1 deletion edgraph/nquads_from_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,11 @@ func parseFacets(m map[string]interface{}, prefix string) ([]*api.Facet, error)
}

// convert facet val interface{} to binary
tid := facets.TypeIDFor(&api.Facet{ValType: f.ValType})
tid, err := facets.TypeIDFor(&api.Facet{ValType: f.ValType})
if err != nil {
return nil, err
}

fVal := &types.Val{Tid: types.BinaryID}
if err := types.Marshal(types.Val{Tid: tid, Value: fv}, fVal); err != nil {
return nil, err
Expand Down
33 changes: 26 additions & 7 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,13 +629,11 @@ func parseMutationObject(mu *api.Mutation) (*gql.Mutation, error) {
res.Del = append(res.Del, nqs...)
}

// We check that the facet value is in the right format based on the facet type.
for _, m := range mu.Set {
for _, f := range m.Facets {
if err := facets.TryValFor(f); err != nil {
return nil, err
}
}
// parse facets and convert to the binary format so that
// a field of type datetime like "2017-01-01" can be correctly encoded in the
// marshaled binary format as done in the time.Marshal method
if err := validateAndConvertFacets(mu); err != nil {
return nil, err
}

res.Set = append(res.Set, mu.Set...)
Expand All @@ -647,6 +645,27 @@ func parseMutationObject(mu *api.Mutation) (*gql.Mutation, error) {
return res, nil
}

func validateAndConvertFacets(mu *api.Mutation) error {
for _, m := range mu.Set {
encodedFacets := make([]*api.Facet, 0, len(m.Facets))
for _, f := range m.Facets {
// try to interpret the value as binary first
if _, err := facets.ValFor(f); err == nil {
encodedFacets = append(encodedFacets, f)
} else {
encodedFacet, err := facets.FacetFor(f.Key, string(f.Value))
if err != nil {
return err
}
encodedFacets = append(encodedFacets, encodedFacet)
}
}

m.Facets = encodedFacets
}
return nil
}

func validateNQuads(set, del []*api.NQuad) error {
for _, nq := range set {
var ostar bool
Expand Down
34 changes: 29 additions & 5 deletions query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,12 @@ func (sg *SubGraph) preTraverse(uid uint64, dst outputNode) error {
if pc.Params.Facet != nil && len(fcsList) > childIdx {
fs := fcsList[childIdx]
for _, f := range fs.Facets {
uc.AddValue(facetName(fieldName, f), facets.ValFor(f))
fVal, err := facets.ValFor(f)
if err != nil {
return err
}

uc.AddValue(facetName(fieldName, f), fVal)
}
}

Expand Down Expand Up @@ -483,7 +488,12 @@ func (sg *SubGraph) preTraverse(uid uint64, dst outputNode) error {
if pc.Params.Facet != nil && len(pc.facetsMatrix[idx].FacetsList) > 0 {
// in case of Value we have only one Facets
for _, f := range pc.facetsMatrix[idx].FacetsList[0].Facets {
dst.AddValue(facetName(fieldName, f), facets.ValFor(f))
fVal, err := facets.ValFor(f)
if err != nil {
return err
}

dst.AddValue(facetName(fieldName, f), fVal)
}
}

Expand Down Expand Up @@ -1495,11 +1505,20 @@ func (sg *SubGraph) populateFacetVars(doneVars map[string]varValue, sgPath []*Su
fvar, ok := sg.Params.FacetVar[f.Key]
if ok {
if pVal, ok := doneVars[fvar].Vals[uid]; !ok {
doneVars[fvar].Vals[uid] = facets.ValFor(f)
fVal, err := facets.ValFor(f)
if err != nil {
return err
}

doneVars[fvar].Vals[uid] = fVal
} else {
// If the value is int/float we add them up. Else we throw an error as
// many to one maps are not allowed for other types.
nVal := facets.ValFor(f)
nVal, err := facets.ValFor(f)
if err != nil {
return err
}

if nVal.Tid != types.IntID && nVal.Tid != types.FloatID {
return x.Errorf("Repeated id with non int/float value for facet var encountered.")
}
Expand Down Expand Up @@ -2210,7 +2229,12 @@ func (sg *SubGraph) sortAndPaginateUsingFacet(ctx context.Context) error {
}
}
if facet != nil {
values = append(values, []types.Val{facets.ValFor(facet)})
fVal, err := facets.ValFor(facet)
if err != nil {
return err
}

values = append(values, []types.Val{fVal})
} else {
values = append(values, []types.Val{{Value: nil}})
}
Expand Down
5 changes: 4 additions & 1 deletion query/shortest.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ func (sg *SubGraph) getCost(matrix, list int) (cost float64,
rerr = x.Errorf("Expected 1 but got %d facets", len(fcs.Facets))
return cost, fcs, rerr
}
tv := facets.ValFor(fcs.Facets[0])
tv, err := facets.ValFor(fcs.Facets[0])
if err != nil {
return 0.0, nil, err
}
if tv.Tid == types.IntID {
cost = float64(tv.Value.(int64))
} else if tv.Tid == types.FloatID {
Expand Down
31 changes: 19 additions & 12 deletions systest/mutations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ func DeleteAllReverseIndex(t *testing.T, c *dgo.Dgraph) {
Running a query would make sure that the previous mutation for
creating the link has completed with a commitTs from zero, and the
subsequent deletion is done *AFTER* the link creation.
*/
c.NewReadOnlyTxn().Query(ctx, fmt.Sprintf("{ q(func: uid(%s)) { link { uid } }}", aId))
*/
c.NewReadOnlyTxn().Query(ctx, fmt.Sprintf("{ q(func: uid(%s)) { link { uid } }}", aId))

_, err = c.NewTxn().Mutate(ctx, &api.Mutation{
CommitNow: true,
Expand Down Expand Up @@ -976,7 +976,7 @@ func DeleteWithExpandAll(t *testing.T, c *dgo.Dgraph) {
require.Equal(t, 0, len(r.Me))
}

func FacetsUsingNQuadsError(t *testing.T, c *dgo.Dgraph) {
func testTimeValue(t *testing.T, c *dgo.Dgraph, timeBytes []byte) {
nquads := []*api.NQuad{
&api.NQuad{
Subject: "0x01",
Expand All @@ -985,7 +985,7 @@ func FacetsUsingNQuadsError(t *testing.T, c *dgo.Dgraph) {
Facets: []*api.Facet{
{
Key: "since",
Value: []byte(time.Now().Format(time.RFC3339)),
Value: timeBytes,
ValType: api.Facet_DATETIME,
},
},
Expand All @@ -994,14 +994,6 @@ func FacetsUsingNQuadsError(t *testing.T, c *dgo.Dgraph) {
mu := &api.Mutation{Set: nquads, CommitNow: true}
ctx := context.Background()
_, err := c.NewTxn().Mutate(ctx, mu)
require.Error(t, err)
require.Contains(t, err.Error(), "Error while parsing facet")

nquads[0].Facets[0].Value, err = time.Now().MarshalBinary()
require.NoError(t, err)

mu = &api.Mutation{Set: nquads, CommitNow: true}
_, err = c.NewTxn().Mutate(context.Background(), mu)
require.NoError(t, err)

q := `query test($id: string) {
Expand All @@ -1015,6 +1007,21 @@ func FacetsUsingNQuadsError(t *testing.T, c *dgo.Dgraph) {
require.Contains(t, string(resp.Json), "since")
}

func FacetsUsingNQuadsError(t *testing.T, c *dgo.Dgraph) {
// test time in go binary format
timeBinary, err := time.Now().MarshalBinary()
require.NoError(t, err)
testTimeValue(t, c, timeBinary)

// test time in full RFC3339 string format
testTimeValue(t, c, []byte(time.Now().Format(time.RFC3339)))

// test time in partial string formats
testTimeValue(t, c, []byte("2018"))
testTimeValue(t, c, []byte("2018-01"))
testTimeValue(t, c, []byte("2018-01-01"))
}

func SkipEmptyPLForHas(t *testing.T, c *dgo.Dgraph) {
ctx := context.Background()
_, err := c.NewTxn().Mutate(ctx, &api.Mutation{
Expand Down
17 changes: 0 additions & 17 deletions types/facets/facet_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,3 @@ func ValTypeForTypeID(typId TypeID) api.Facet_ValType {
}
panic("Unhandled case in ValTypeForTypeID.")
}

// TypeIDForValType gives TypeID for api.Facet_ValType
func TypeIDForValType(valType api.Facet_ValType) TypeID {
switch valType {
case api.Facet_INT:
return IntID
case api.Facet_FLOAT:
return FloatID
case api.Facet_BOOL:
return BoolID
case api.Facet_DATETIME:
return DateTimeID
case api.Facet_STRING:
return StringID
}
panic("Unhandled case in TypeIDForValType.")
}
54 changes: 25 additions & 29 deletions types/facets/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,11 @@ func FacetFor(key, val string) (*api.Facet, error) {
}

// convert facet val interface{} to binary
tid := TypeIDFor(&api.Facet{ValType: vt})
tid, err := TypeIDFor(&api.Facet{ValType: vt})
if err != nil {
return nil, err
}

fVal := &types.Val{Tid: types.BinaryID}
if err = types.Marshal(types.Val{Tid: tid, Value: v}, fVal); err != nil {
return nil, err
Expand Down Expand Up @@ -174,38 +178,30 @@ func SameFacets(a []*api.Facet, b []*api.Facet) bool {
}

// TypeIDFor gives TypeID for facet.
func TypeIDFor(f *api.Facet) types.TypeID {
switch TypeIDForValType(f.ValType) {
case IntID:
return types.IntID
case StringID:
return types.StringID
case BoolID:
return types.BoolID
case DateTimeID:
return types.DateTimeID
case FloatID:
return types.FloatID
func TypeIDFor(f *api.Facet) (types.TypeID, error) {
switch f.ValType {
case api.Facet_INT:
return types.IntID, nil
case api.Facet_FLOAT:
return types.FloatID, nil
case api.Facet_BOOL:
return types.BoolID, nil
case api.Facet_DATETIME:
return types.DateTimeID, nil
case api.Facet_STRING:
return types.StringID, nil
default:
panic("unhandled case in facetValToTypeVal")
return types.DefaultID, fmt.Errorf("Unrecognized facet type: %v", f.ValType)
}
}

// TryValFor tries to convert the facet to the its type from binary format. We use it to validate
// the facets set directly by the user during mutation.
func TryValFor(f *api.Facet) error {
val := types.Val{Tid: types.BinaryID, Value: f.Value}
typId := TypeIDFor(f)
_, err := types.Convert(val, typId)
return x.Wrapf(err, "Error while parsing facet: [%v]", f)
}

// ValFor converts Facet into types.Val.
func ValFor(f *api.Facet) types.Val {
func ValFor(f *api.Facet) (types.Val, error) {
val := types.Val{Tid: types.BinaryID, Value: f.Value}
typId := TypeIDFor(f)
v, err := types.Convert(val, typId)
x.AssertTruef(err == nil,
"We should always be able to covert facet into val. %v %v", f.Value, typId)
return v
facetTid, err := TypeIDFor(f)
if err != nil {
return types.Val{}, err
}

return types.Convert(val, facetTid)
}
27 changes: 22 additions & 5 deletions worker/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,29 @@ func toRDF(pl *posting.List, prefix string, readTs uint64) (*pb.KV, error) {
}
buf.WriteString(f.Key)
buf.WriteByte('=')
fVal := &types.Val{Tid: types.StringID}
x.Check(types.Marshal(facets.ValFor(f), fVal))
if facets.TypeIDFor(f) == types.StringID {
buf.WriteString(strconv.Quote(fVal.Value.(string)))

fVal, err := facets.ValFor(f)
if err != nil {
glog.Errorf("Error getting value from facet %#v:%v", f, err)
continue
}

fStringVal := &types.Val{Tid: types.StringID}
if err = types.Marshal(fVal, fStringVal); err != nil {
glog.Errorf("Error while marshaling facet value %v to string: %v",
fVal, err)
continue
}
facetTid, err := facets.TypeIDFor(f)
if err != nil {
glog.Errorf("Error getting type id from facet %#v:%v", f, err)
continue
}

if facetTid == types.StringID {
buf.WriteString(strconv.Quote(fStringVal.Value.(string)))
} else {
buf.WriteString(fVal.Value.(string))
buf.WriteString(fStringVal.Value.(string))
}
}
buf.WriteByte(')')
Expand Down
19 changes: 16 additions & 3 deletions worker/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1502,7 +1502,11 @@ func applyFacetsTree(postingFacets []*api.Facet, ftree *facetsTree) (bool, error
switch fnType {
case CompareAttrFn: // lt, gt, le, ge, eq
var err error
typId := facets.TypeIDFor(fc)
typId, err := facets.TypeIDFor(fc)
if err != nil {
return false, err
}

v, has := ftree.function.convertedVal[typId]
if !has {
if v, err = types.Convert(ftree.function.val, typId); err != nil {
Expand All @@ -1512,10 +1516,19 @@ func applyFacetsTree(postingFacets []*api.Facet, ftree *facetsTree) (bool, error
ftree.function.convertedVal[typId] = v
}
}
return types.CompareVals(fname, facets.ValFor(fc), v), nil
fVal, err := facets.ValFor(fc)
if err != nil {
return false, err
}

return types.CompareVals(fname, fVal, v), nil

case StandardFn: // allofterms, anyofterms
if facets.TypeIDForValType(fc.ValType) != facets.StringID {
facetType, err := facets.TypeIDFor(fc)
if err != nil {
return false, err
}
if facetType != types.StringID {
return false, nil
}
return filterOnStandardFn(fname, fc.Tokens, ftree.function.tokens)
Expand Down

0 comments on commit a25d2b7

Please sign in to comment.