Skip to content

Commit

Permalink
fix(export): Fix facet export of reference type postings to JSON form…
Browse files Browse the repository at this point in the history
…at (#7744) (#7756)

Fix the export of facets in the JSON format and add integration
test for export for various cases of facets.

(cherry picked from commit aff03e5)
(cherry picked from commit ea927fb)
  • Loading branch information
ahsanbarkati committed May 27, 2021
1 parent 8259e24 commit 2136479
Show file tree
Hide file tree
Showing 3 changed files with 231 additions and 45 deletions.
217 changes: 197 additions & 20 deletions systest/export/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"encoding/json"
"io/ioutil"
"net/http"
"os"
"path/filepath"
"testing"

"github.com/dgraph-io/dgo/v210"
Expand All @@ -36,8 +38,9 @@ import (
var (
mc *minio.Client
bucketName = "dgraph-backup"
destination = "minio://minio:9001/dgraph-backup?secure=false"
minioDest = "minio://minio:9001/dgraph-backup?secure=false"
localBackupDst = "minio://localhost:9001/dgraph-backup?secure=false"
copyExportDir = "./data/export-copy"
)

// TestExportSchemaToMinio. This test does an export, then verifies that the
Expand All @@ -48,11 +51,12 @@ func TestExportSchemaToMinio(t *testing.T) {
require.NoError(t, err)
mc.MakeBucket(bucketName, "")

setupDgraph(t)
result := requestExport(t)
setupDgraph(t, moviesData, movieSchema)
result := requestExport(t, minioDest, "rdf")

require.Equal(t, "Success", getFromJSON(result, "data", "export", "response", "code").(string))
require.Equal(t, "Export completed.", getFromJSON(result, "data", "export", "response", "message").(string))
require.Equal(t, "Export completed.",
getFromJSON(result, "data", "export", "response", "message").(string))

var files []string
for _, f := range getFromJSON(result, "data", "export", "exportedFiles").([]interface{}) {
Expand Down Expand Up @@ -93,8 +97,189 @@ var expectedSchema = `[0x0] <movie>:string .` + " " + `
dgraph.graphql.p_query
}
`
var moviesData = `<_:x1> <movie> "BIRDS MAN OR (THE UNEXPECTED VIRTUE OF IGNORANCE)" .
<_:x2> <movie> "Spotlight" .
<_:x3> <movie> "Moonlight" .
<_:x4> <movie> "THE SHAPE OF WATERLOO" .
<_:x5> <movie> "BLACK PUNTER" .`

func setupDgraph(t *testing.T) {
var movieSchema = `
movie: string .
type Node {
movie
}`

func TestExportAndLoadJson(t *testing.T) {
setupDgraph(t, moviesData, movieSchema)

// Run export
result := requestExport(t, "/data/export-data", "json")
require.Equal(t, "Success", getFromJSON(result, "data", "export", "response", "code").(string))
require.Equal(t, "Export completed.",
getFromJSON(result, "data", "export", "response", "message").(string))

var files []string
for _, f := range getFromJSON(result, "data", "export", "exportedFiles").([]interface{}) {
files = append(files, f.(string))
}
require.Equal(t, 3, len(files))
copyToLocalFs(t)

q := `{ q(func:has(movie)) { count(uid) } }`

res := runQuery(t, q)
require.JSONEq(t, `{"data":{"q":[{"count": 5}]}}`, res)

// Drop all data
dg, err := testutil.DgraphClient(testutil.SockAddr)
require.NoError(t, err)
err = dg.Alter(context.Background(), &api.Operation{DropAll: true})
require.NoError(t, err)

res = runQuery(t, q)
require.JSONEq(t, `{"data": {"q": [{"count":0}]}}`, res)

// Live load the exported data
base := filepath.Dir(files[0])
dir := filepath.Join(copyExportDir, base)
loadData(t, dir, "json")

res = runQuery(t, q)
require.JSONEq(t, `{"data":{"q":[{"count": 5}]}}`, res)

dirCleanup(t)
}

var facetsData = `
_:blank-0 <name> "Carol" .
_:blank-0 <friend> _:blank-1 (close="yes") .
_:blank-1 <name> "Daryl" .
_:a <pred> "test" (f="test") .
_:a <predlist> "London" (cont="England") .
_:a <predlist> "Paris" (cont="France") .
_:a <name> "alice" .
_:b <refone> _:a (f="something") .
_:b <name> "bob" .
`

var facetsSchema = `
<name>: string @index(exact) .
<friend>: [uid] .
<refone>: uid .
<predlist>: [string] .
`

func TestExportAndLoadJsonFacets(t *testing.T) {
setupDgraph(t, facetsData, facetsSchema)

// Run export
result := requestExport(t, "/data/export-data", "json")
require.Equal(t, "Success", getFromJSON(result, "data", "export", "response", "code").(string))
require.Equal(t, "Export completed.",
getFromJSON(result, "data", "export", "response", "message").(string))

var files []string
for _, f := range getFromJSON(result, "data", "export", "exportedFiles").([]interface{}) {
files = append(files, f.(string))
}
require.Equal(t, 3, len(files))
copyToLocalFs(t)

checkRes := func() {
// Check value posting.
q := `{ q(func:has(name)) { pred @facets } }`
res := runQuery(t, q)
require.JSONEq(t, `{"data": {"q": [{"pred": "test", "pred|f": "test"}]}}`, res)

// Check value postings of list type.
q = `{ q(func:has(name)) { predlist @facets } }`
res = runQuery(t, q)
require.JSONEq(t, `{"data": {"q": [{
"predlist|cont": {"0": "England","1": "France"},
"predlist": ["London","Paris" ]}]}}`, res)

// Check reference posting.
q = `{ q(func:has(name)) { refone @facets {name} } }`
res = runQuery(t, q)
require.JSONEq(t,
`{"data":{"q":[{"refone":{"name":"alice","refone|f":"something"}}]}}`, res)

// Check reference postings of list type.
q = `{ q(func:has(name)) { friend @facets {name} } }`
res = runQuery(t, q)
require.JSONEq(t,
`{"data":{"q":[{"friend":[{"name":"Daryl","friend|close":"yes"}]}]}}`, res)
}

checkRes()

// Drop all data
dg, err := testutil.DgraphClient(testutil.SockAddr)
require.NoError(t, err)
err = dg.Alter(context.Background(), &api.Operation{DropAll: true})
require.NoError(t, err)

res := runQuery(t, `{ q(func:has(name)) { name } }`)
require.JSONEq(t, `{"data": {"q": []}}`, res)

// Live load the exported data and verify that exported data is loaded correctly.
base := filepath.Dir(files[0])
dir := filepath.Join(copyExportDir, base)
loadData(t, dir, "json")

// verify that the state after loading the exported data as same.
checkRes()
dirCleanup(t)
}

func runQuery(t *testing.T, q string) string {
dg, err := testutil.DgraphClient(testutil.SockAddr)
require.NoError(t, err)

resp, err := testutil.RetryQuery(dg, q)
require.NoError(t, err)
response := map[string]interface{}{}
response["data"] = json.RawMessage(string(resp.Json))

jsonResponse, err := json.Marshal(response)
require.NoError(t, err)
return string(jsonResponse)
}

func copyToLocalFs(t *testing.T) {
require.NoError(t, os.RemoveAll(copyExportDir))
srcPath := testutil.DockerPrefix + "_alpha1_1:/data/export-data"
require.NoError(t, testutil.DockerCp(srcPath, copyExportDir))
}

func loadData(t *testing.T, dir, format string) {
schemaFile := dir + "/g01.schema.gz"
dataFile := dir + "/g01." + format + ".gz"

pipeline := [][]string{
{testutil.DgraphBinaryPath(), "live",
"-s", schemaFile, "-f", dataFile, "--alpha",
testutil.SockAddr, "--zero", testutil.SockAddrZero,
},
}
_, err := testutil.Pipeline(pipeline)
require.NoErrorf(t, err, "Got error while loading data: %v", err)

}

func dirCleanup(t *testing.T) {
require.NoError(t, os.RemoveAll("./t"))
require.NoError(t, os.RemoveAll("./data"))

cmd := []string{"bash", "-c", "rm -rf /data/export-data/*"}
require.NoError(t, testutil.DockerExec("alpha1", cmd...))
}

func setupDgraph(t *testing.T, nquads, schema string) {

require.NoError(t, os.MkdirAll("./data", os.ModePerm))
conn, err := grpc.Dial(testutil.SockAddr, grpc.WithInsecure())
require.NoError(t, err)
dg := dgo.NewDgraphClient(api.NewDgraphClient(conn))
Expand All @@ -104,28 +289,19 @@ func setupDgraph(t *testing.T) {

// Add schema and types.
// this is because Alters are always blocked until the indexing is finished.
require.NoError(t, testutil.RetryAlter(dg, &api.Operation{Schema: `movie: string .
type Node {
movie
}`}))
require.NoError(t, testutil.RetryAlter(dg, &api.Operation{Schema: schema}))

// Add initial data.
_, err = dg.NewTxn().Mutate(ctx, &api.Mutation{
CommitNow: true,
SetNquads: []byte(`
<_:x1> <movie> "BIRDS MAN OR (THE UNEXPECTED VIRTUE OF IGNORANCE)" .
<_:x2> <movie> "Spotlight" .
<_:x3> <movie> "Moonlight" .
<_:x4> <movie> "THE SHAPE OF WATERLOO" .
<_:x5> <movie> "BLACK PUNTER" .
`),
SetNquads: []byte(nquads),
})
require.NoError(t, err)
}

func requestExport(t *testing.T) map[string]interface{} {
exportRequest := `mutation export($dst: String!) {
export(input: {destination: $dst}) {
func requestExport(t *testing.T, dest string, format string) map[string]interface{} {
exportRequest := `mutation export($dst: String!, $f: String!) {
export(input: {destination: $dst, format: $f}) {
response {
code
message
Expand All @@ -138,7 +314,8 @@ func requestExport(t *testing.T) map[string]interface{} {
params := testutil.GraphQLParams{
Query: exportRequest,
Variables: map[string]interface{}{
"dst": destination,
"dst": dest,
"f": format,
},
}
b, err := json.Marshal(params)
Expand Down
55 changes: 32 additions & 23 deletions worker/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,31 @@ func (e *exporter) toJSON() (*bpb.KVList, error) {
// We could output more compact JSON at the cost of code complexity.
// Leaving it simple for now.

writeFacets := func(pfacets []*api.Facet) error {
for _, fct := range pfacets {
fmt.Fprintf(bp, `,"%s|%s":`, e.attr, fct.Key)

str, err := facetToString(fct)
if err != nil {
glog.Errorf("Ignoring error: %+v", err)
return nil
}

tid, err := facets.TypeIDFor(fct)
if err != nil {
glog.Errorf("Error getting type id from facet %#v: %v", fct, err)
continue
}

if !tid.IsNumber() {
str = escapedString(str)
}

fmt.Fprint(bp, str)
}
return nil
}

continuing := false
mapStart := fmt.Sprintf(" {\"uid\":"+uidFmtStrJson+`,"namespace":"0x%x"`, e.uid, e.namespace)
err := e.pl.Iterate(e.readTs, 0, func(p *pb.Posting) error {
Expand All @@ -155,8 +180,11 @@ func (e *exporter) toJSON() (*bpb.KVList, error) {
fmt.Fprint(bp, mapStart)
if p.PostingType == pb.Posting_REF {
fmt.Fprintf(bp, `,"%s":[`, e.attr)
fmt.Fprintf(bp, "{\"uid\":"+uidFmtStrJson+"}", p.Uid)
fmt.Fprint(bp, "]")
fmt.Fprintf(bp, "{\"uid\":"+uidFmtStrJson, p.Uid)
if err := writeFacets(p.Facets); err != nil {
return errors.Wrap(err, "While writing facets for posting_REF")
}
fmt.Fprint(bp, "}]")
} else {
if p.PostingType == pb.Posting_VALUE_LANG {
fmt.Fprintf(bp, `,"%s@%s":`, e.attr, string(p.LangTag))
Expand All @@ -179,28 +207,9 @@ func (e *exporter) toJSON() (*bpb.KVList, error) {
}

fmt.Fprint(bp, str)
}

for _, fct := range p.Facets {
fmt.Fprintf(bp, `,"%s|%s":`, e.attr, fct.Key)

str, err := facetToString(fct)
if err != nil {
glog.Errorf("Ignoring error: %+v", err)
return nil
}

tid, err := facets.TypeIDFor(fct)
if err != nil {
glog.Errorf("Error getting type id from facet %#v: %v", fct, err)
continue
if err := writeFacets(p.Facets); err != nil {
return errors.Wrap(err, "While writing facets for value postings")
}

if !tid.IsNumber() {
str = escapedString(str)
}

fmt.Fprint(bp, str)
}

fmt.Fprint(bp, "}")
Expand Down
4 changes: 2 additions & 2 deletions worker/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,9 @@ func TestExportJson(t *testing.T) {
{"uid":"0x1","namespace":"0x0","friend":[{"uid":"0x5"}]},
{"uid":"0x2","namespace":"0x0","friend":[{"uid":"0x5"}]},
{"uid":"0x3","namespace":"0x0","friend":[{"uid":"0x5"}]},
{"uid":"0x4","namespace":"0x0","friend":[{"uid":"0x5"}],"friend|age":33,
{"uid":"0x4","namespace":"0x0","friend":[{"uid":"0x5","friend|age":33,
"friend|close":"true","friend|game":"football",
"friend|poem":"roses are red\nviolets are blue","friend|since":"2005-05-02T15:04:05Z"},
"friend|poem":"roses are red\nviolets are blue","friend|since":"2005-05-02T15:04:05Z"}]},
{"uid":"0x9","namespace":"0x2","name":"ns2"}
]
`
Expand Down

0 comments on commit 2136479

Please sign in to comment.