Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(export): Fix facet export of reference type postings to JSON format #7744

Merged
merged 6 commits into from
Apr 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 197 additions & 20 deletions systest/export/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"encoding/json"
"io/ioutil"
"net/http"
"os"
"path/filepath"
"testing"

"github.com/dgraph-io/dgo/v210"
Expand All @@ -36,8 +38,9 @@ import (
var (
mc *minio.Client
bucketName = "dgraph-backup"
destination = "minio://minio:9001/dgraph-backup?secure=false"
minioDest = "minio://minio:9001/dgraph-backup?secure=false"
localBackupDst = "minio://localhost:9001/dgraph-backup?secure=false"
copyExportDir = "./data/export-copy"
)

// TestExportSchemaToMinio. This test does an export, then verifies that the
Expand All @@ -48,11 +51,12 @@ func TestExportSchemaToMinio(t *testing.T) {
require.NoError(t, err)
mc.MakeBucket(bucketName, "")

setupDgraph(t)
result := requestExport(t)
setupDgraph(t, moviesData, movieSchema)
result := requestExport(t, minioDest, "rdf")

require.Equal(t, "Success", getFromJSON(result, "data", "export", "response", "code").(string))
require.Equal(t, "Export completed.", getFromJSON(result, "data", "export", "response", "message").(string))
require.Equal(t, "Export completed.",
getFromJSON(result, "data", "export", "response", "message").(string))

var files []string
for _, f := range getFromJSON(result, "data", "export", "exportedFiles").([]interface{}) {
Expand Down Expand Up @@ -93,8 +97,189 @@ var expectedSchema = `[0x0] <movie>:string .` + " " + `
dgraph.graphql.p_query
}
`
var moviesData = `<_:x1> <movie> "BIRDS MAN OR (THE UNEXPECTED VIRTUE OF IGNORANCE)" .
<_:x2> <movie> "Spotlight" .
<_:x3> <movie> "Moonlight" .
<_:x4> <movie> "THE SHAPE OF WATERLOO" .
<_:x5> <movie> "BLACK PUNTER" .`

func setupDgraph(t *testing.T) {
var movieSchema = `
movie: string .
type Node {
movie
}`

func TestExportAndLoadJson(t *testing.T) {
setupDgraph(t, moviesData, movieSchema)

// Run export
result := requestExport(t, "/data/export-data", "json")
require.Equal(t, "Success", getFromJSON(result, "data", "export", "response", "code").(string))
require.Equal(t, "Export completed.",
getFromJSON(result, "data", "export", "response", "message").(string))

var files []string
for _, f := range getFromJSON(result, "data", "export", "exportedFiles").([]interface{}) {
files = append(files, f.(string))
}
require.Equal(t, 3, len(files))
copyToLocalFs(t)

q := `{ q(func:has(movie)) { count(uid) } }`

res := runQuery(t, q)
require.JSONEq(t, `{"data":{"q":[{"count": 5}]}}`, res)

// Drop all data
dg, err := testutil.DgraphClient(testutil.SockAddr)
require.NoError(t, err)
err = dg.Alter(context.Background(), &api.Operation{DropAll: true})
require.NoError(t, err)

res = runQuery(t, q)
require.JSONEq(t, `{"data": {"q": [{"count":0}]}}`, res)

// Live load the exported data
base := filepath.Dir(files[0])
dir := filepath.Join(copyExportDir, base)
loadData(t, dir, "json")

res = runQuery(t, q)
require.JSONEq(t, `{"data":{"q":[{"count": 5}]}}`, res)

dirCleanup(t)
}

var facetsData = `
_:blank-0 <name> "Carol" .
_:blank-0 <friend> _:blank-1 (close="yes") .
_:blank-1 <name> "Daryl" .

_:a <pred> "test" (f="test") .
_:a <predlist> "London" (cont="England") .
_:a <predlist> "Paris" (cont="France") .
_:a <name> "alice" .

_:b <refone> _:a (f="something") .
_:b <name> "bob" .
`

var facetsSchema = `
<name>: string @index(exact) .
<friend>: [uid] .
<refone>: uid .
<predlist>: [string] .
`

func TestExportAndLoadJsonFacets(t *testing.T) {
setupDgraph(t, facetsData, facetsSchema)

// Run export
result := requestExport(t, "/data/export-data", "json")
require.Equal(t, "Success", getFromJSON(result, "data", "export", "response", "code").(string))
require.Equal(t, "Export completed.",
getFromJSON(result, "data", "export", "response", "message").(string))

var files []string
for _, f := range getFromJSON(result, "data", "export", "exportedFiles").([]interface{}) {
files = append(files, f.(string))
}
require.Equal(t, 3, len(files))
copyToLocalFs(t)

checkRes := func() {
// Check value posting.
q := `{ q(func:has(name)) { pred @facets } }`
res := runQuery(t, q)
require.JSONEq(t, `{"data": {"q": [{"pred": "test", "pred|f": "test"}]}}`, res)

// Check value postings of list type.
q = `{ q(func:has(name)) { predlist @facets } }`
res = runQuery(t, q)
require.JSONEq(t, `{"data": {"q": [{
"predlist|cont": {"0": "England","1": "France"},
"predlist": ["London","Paris" ]}]}}`, res)

// Check reference posting.
q = `{ q(func:has(name)) { refone @facets {name} } }`
res = runQuery(t, q)
require.JSONEq(t,
`{"data":{"q":[{"refone":{"name":"alice","refone|f":"something"}}]}}`, res)

// Check reference postings of list type.
q = `{ q(func:has(name)) { friend @facets {name} } }`
res = runQuery(t, q)
require.JSONEq(t,
`{"data":{"q":[{"friend":[{"name":"Daryl","friend|close":"yes"}]}]}}`, res)
}

checkRes()

// Drop all data
dg, err := testutil.DgraphClient(testutil.SockAddr)
require.NoError(t, err)
err = dg.Alter(context.Background(), &api.Operation{DropAll: true})
require.NoError(t, err)

res := runQuery(t, `{ q(func:has(name)) { name } }`)
require.JSONEq(t, `{"data": {"q": []}}`, res)

// Live load the exported data and verify that exported data is loaded correctly.
base := filepath.Dir(files[0])
dir := filepath.Join(copyExportDir, base)
loadData(t, dir, "json")

// verify that the state after loading the exported data as same.
checkRes()
dirCleanup(t)
}

func runQuery(t *testing.T, q string) string {
dg, err := testutil.DgraphClient(testutil.SockAddr)
require.NoError(t, err)

resp, err := testutil.RetryQuery(dg, q)
require.NoError(t, err)
response := map[string]interface{}{}
response["data"] = json.RawMessage(string(resp.Json))

jsonResponse, err := json.Marshal(response)
require.NoError(t, err)
return string(jsonResponse)
}

func copyToLocalFs(t *testing.T) {
require.NoError(t, os.RemoveAll(copyExportDir))
srcPath := testutil.DockerPrefix + "_alpha1_1:/data/export-data"
require.NoError(t, testutil.DockerCp(srcPath, copyExportDir))
}

func loadData(t *testing.T, dir, format string) {
schemaFile := dir + "/g01.schema.gz"
dataFile := dir + "/g01." + format + ".gz"

pipeline := [][]string{
{testutil.DgraphBinaryPath(), "live",
"-s", schemaFile, "-f", dataFile, "--alpha",
testutil.SockAddr, "--zero", testutil.SockAddrZero,
},
}
_, err := testutil.Pipeline(pipeline)
require.NoErrorf(t, err, "Got error while loading data: %v", err)

}

func dirCleanup(t *testing.T) {
require.NoError(t, os.RemoveAll("./t"))
require.NoError(t, os.RemoveAll("./data"))

cmd := []string{"bash", "-c", "rm -rf /data/export-data/*"}
require.NoError(t, testutil.DockerExec("alpha1", cmd...))
}

func setupDgraph(t *testing.T, nquads, schema string) {

require.NoError(t, os.MkdirAll("./data", os.ModePerm))
conn, err := grpc.Dial(testutil.SockAddr, grpc.WithInsecure())
require.NoError(t, err)
dg := dgo.NewDgraphClient(api.NewDgraphClient(conn))
Expand All @@ -104,28 +289,19 @@ func setupDgraph(t *testing.T) {

// Add schema and types.
// this is because Alters are always blocked until the indexing is finished.
require.NoError(t, testutil.RetryAlter(dg, &api.Operation{Schema: `movie: string .
type Node {
movie
}`}))
require.NoError(t, testutil.RetryAlter(dg, &api.Operation{Schema: schema}))

// Add initial data.
_, err = dg.NewTxn().Mutate(ctx, &api.Mutation{
CommitNow: true,
SetNquads: []byte(`
<_:x1> <movie> "BIRDS MAN OR (THE UNEXPECTED VIRTUE OF IGNORANCE)" .
<_:x2> <movie> "Spotlight" .
<_:x3> <movie> "Moonlight" .
<_:x4> <movie> "THE SHAPE OF WATERLOO" .
<_:x5> <movie> "BLACK PUNTER" .
`),
SetNquads: []byte(nquads),
})
require.NoError(t, err)
}

func requestExport(t *testing.T) map[string]interface{} {
exportRequest := `mutation export($dst: String!) {
export(input: {destination: $dst}) {
func requestExport(t *testing.T, dest string, format string) map[string]interface{} {
exportRequest := `mutation export($dst: String!, $f: String!) {
export(input: {destination: $dst, format: $f}) {
response {
code
message
Expand All @@ -138,7 +314,8 @@ func requestExport(t *testing.T) map[string]interface{} {
params := testutil.GraphQLParams{
Query: exportRequest,
Variables: map[string]interface{}{
"dst": destination,
"dst": dest,
"f": format,
},
}
b, err := json.Marshal(params)
Expand Down
55 changes: 32 additions & 23 deletions worker/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,31 @@ func (e *exporter) toJSON() (*bpb.KVList, error) {
// We could output more compact JSON at the cost of code complexity.
// Leaving it simple for now.

writeFacets := func(pfacets []*api.Facet) error {
for _, fct := range pfacets {
fmt.Fprintf(bp, `,"%s|%s":`, e.attr, fct.Key)

str, err := facetToString(fct)
if err != nil {
glog.Errorf("Ignoring error: %+v", err)
return nil
}

tid, err := facets.TypeIDFor(fct)
if err != nil {
glog.Errorf("Error getting type id from facet %#v: %v", fct, err)
continue
}

if !tid.IsNumber() {
str = escapedString(str)
}

fmt.Fprint(bp, str)
}
return nil
}

continuing := false
mapStart := fmt.Sprintf(" {\"uid\":"+uidFmtStrJson+`,"namespace":"0x%x"`, e.uid, e.namespace)
err := e.pl.IterateAll(e.readTs, 0, func(p *pb.Posting) error {
Expand All @@ -154,8 +179,11 @@ func (e *exporter) toJSON() (*bpb.KVList, error) {
fmt.Fprint(bp, mapStart)
if p.PostingType == pb.Posting_REF {
fmt.Fprintf(bp, `,"%s":[`, e.attr)
fmt.Fprintf(bp, "{\"uid\":"+uidFmtStrJson+"}", p.Uid)
fmt.Fprint(bp, "]")
fmt.Fprintf(bp, "{\"uid\":"+uidFmtStrJson, p.Uid)
if err := writeFacets(p.Facets); err != nil {
return errors.Wrap(err, "While writing facets for posting_REF")
}
fmt.Fprint(bp, "}]")
} else {
if p.PostingType == pb.Posting_VALUE_LANG {
fmt.Fprintf(bp, `,"%s@%s":`, e.attr, string(p.LangTag))
Expand All @@ -178,28 +206,9 @@ func (e *exporter) toJSON() (*bpb.KVList, error) {
}

fmt.Fprint(bp, str)
}

for _, fct := range p.Facets {
fmt.Fprintf(bp, `,"%s|%s":`, e.attr, fct.Key)

str, err := facetToString(fct)
if err != nil {
glog.Errorf("Ignoring error: %+v", err)
return nil
}

tid, err := facets.TypeIDFor(fct)
if err != nil {
glog.Errorf("Error getting type id from facet %#v: %v", fct, err)
continue
if err := writeFacets(p.Facets); err != nil {
return errors.Wrap(err, "While writing facets for value postings")
}

if !tid.IsNumber() {
str = escapedString(str)
}

fmt.Fprint(bp, str)
}

fmt.Fprint(bp, "}")
Expand Down
4 changes: 2 additions & 2 deletions worker/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,9 @@ func TestExportJson(t *testing.T) {
{"uid":"0x1","namespace":"0x0","friend":[{"uid":"0x5"}]},
{"uid":"0x2","namespace":"0x0","friend":[{"uid":"0x5"}]},
{"uid":"0x3","namespace":"0x0","friend":[{"uid":"0x5"}]},
{"uid":"0x4","namespace":"0x0","friend":[{"uid":"0x5"}],"friend|age":33,
{"uid":"0x4","namespace":"0x0","friend":[{"uid":"0x5","friend|age":33,
"friend|close":"true","friend|game":"football",
"friend|poem":"roses are red\nviolets are blue","friend|since":"2005-05-02T15:04:05Z"},
"friend|poem":"roses are red\nviolets are blue","friend|since":"2005-05-02T15:04:05Z"}]},
{"uid":"0x9","namespace":"0x2","name":"ns2"}
]
`
Expand Down