Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(bulk): enable running bulk loader with only gql schema #8903

Merged
merged 1 commit into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 45 additions & 10 deletions dgraph/cmd/bulk/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/dgraph-io/dgraph/chunker"
"github.com/dgraph-io/dgraph/ee/enc"
"github.com/dgraph-io/dgraph/filestore"
gqlSchema "github.com/dgraph-io/dgraph/graphql/schema"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/schema"
"github.com/dgraph-io/dgraph/x"
Expand Down Expand Up @@ -186,7 +187,7 @@ func (ld *loader) leaseNamespaces() {
ns, err := client.AssignIds(ctx, &pb.Num{Val: maxNs, Type: pb.Num_NS_ID})
cancel()
if err == nil {
fmt.Printf("Assigned namespaces till %d", ns.GetEndId())
fmt.Printf("Assigned namespaces till %d\n", ns.GetEndId())
return
}
fmt.Printf("Error communicating with dgraph zero, retrying: %v", err)
Expand All @@ -195,6 +196,10 @@ func (ld *loader) leaseNamespaces() {
}

func readSchema(opt *options) *schema.ParsedSchema {
if opt.SchemaFile == "" {
return genDQLSchema(opt)
}

f, err := filestore.Open(opt.SchemaFile)
x.Check(err)
defer func() {
Expand Down Expand Up @@ -222,6 +227,32 @@ func readSchema(opt *options) *schema.ParsedSchema {
return result
}

func genDQLSchema(opt *options) *schema.ParsedSchema {
gqlSchBytes := readGqlSchema(opt)
nsToSchemas := parseGqlSchema(string(gqlSchBytes))

var finalSch schema.ParsedSchema
for ns, gqlSch := range nsToSchemas {
if opt.Namespace != math.MaxUint64 {
ns = opt.Namespace
}

h, err := gqlSchema.NewHandler(gqlSch, false)
x.Check(err)

_, err = gqlSchema.FromString(h.GQLSchema(), ns)
x.Check(err)

ps, err := schema.ParseWithNamespace(h.DGSchema(), ns)
x.Check(err)

finalSch.Preds = append(finalSch.Preds, ps.Preds...)
finalSch.Types = append(finalSch.Types, ps.Types...)
}

return &finalSch
}

func (ld *loader) mapStage() {
ld.prog.setPhase(mapPhase)
var db *badger.DB
Expand Down Expand Up @@ -332,32 +363,35 @@ func parseGqlSchema(s string) map[uint64]string {
return schemaMap
}

func (ld *loader) processGqlSchema(loadType chunker.InputFormat) {
if ld.opt.GqlSchemaFile == "" {
return
}

f, err := filestore.Open(ld.opt.GqlSchemaFile)
func readGqlSchema(opt *options) []byte {
f, err := filestore.Open(opt.GqlSchemaFile)
x.Check(err)
defer func() {
if err := f.Close(); err != nil {
glog.Warningf("error while closing fd: %v", err)
}
}()

key := ld.opt.EncryptionKey
if !ld.opt.Encrypted {
key := opt.EncryptionKey
if !opt.Encrypted {
key = nil
}
r, err := enc.GetReader(key, f)
x.Check(err)
if filepath.Ext(ld.opt.GqlSchemaFile) == ".gz" {
if filepath.Ext(opt.GqlSchemaFile) == ".gz" {
r, err = gzip.NewReader(r)
x.Check(err)
}

buf, err := io.ReadAll(r)
x.Check(err)
return buf
}

func (ld *loader) processGqlSchema(loadType chunker.InputFormat) {
if ld.opt.GqlSchemaFile == "" {
return
}

rdfSchema := `_:gqlschema <dgraph.type> "dgraph.graphql" <%#x> .
_:gqlschema <dgraph.graphql.xid> "dgraph.graphql.schema" <%#x> .
Expand Down Expand Up @@ -388,6 +422,7 @@ func (ld *loader) processGqlSchema(loadType chunker.InputFormat) {
ld.readerChunkCh <- gqlBuf
}

buf := readGqlSchema(ld.opt)
schemas := parseGqlSchema(string(buf))
if ld.opt.Namespace == math.MaxUint64 {
// Preserve the namespace.
Expand Down
16 changes: 10 additions & 6 deletions dgraph/cmd/bulk/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,16 @@ func run() {
fmt.Printf("Encrypted input: %v; Encrypted output: %v\n", opt.Encrypted, opt.EncryptedOut)

if opt.SchemaFile == "" {
fmt.Fprint(os.Stderr, "Schema file must be specified.\n")
os.Exit(1)
}
if !filestore.Exists(opt.SchemaFile) {
fmt.Fprintf(os.Stderr, "Schema path(%v) does not exist.\n", opt.SchemaFile)
os.Exit(1)
// if only graphql schema is provided, we can generate DQL schema from it.
if opt.GqlSchemaFile == "" {
fmt.Fprint(os.Stderr, "Schema file must be specified.\n")
mangalaman93 marked this conversation as resolved.
Show resolved Hide resolved
os.Exit(1)
}
} else {
if !filestore.Exists(opt.SchemaFile) {
fmt.Fprintf(os.Stderr, "Schema path(%v) does not exist.\n", opt.SchemaFile)
os.Exit(1)
}
}
if opt.DataFiles == "" {
fmt.Fprint(os.Stderr, "RDF or JSON file(s) location must be specified.\n")
Expand Down
4 changes: 2 additions & 2 deletions dgraphtest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,15 +238,15 @@ func (hc *HTTPClient) RunGraphqlQuery(params GraphQLParams, admin bool) ([]byte,

respBody, err := hc.doPost(reqBody, admin)
if err != nil {
return nil, errors.Wrap(err, "error while running admin query")
return nil, errors.Wrap(err, "error while running graphql query")
}

var gqlResp GraphQLResponse
if err := json.Unmarshal(respBody, &gqlResp); err != nil {
return nil, errors.Wrap(err, "error unmarshalling GQL response")
}
if len(gqlResp.Errors) > 0 {
return nil, errors.Wrapf(gqlResp.Errors, "error while running admin query, resp: %v", string(gqlResp.Data))
return nil, errors.Wrapf(gqlResp.Errors, "error while running graphql query, resp: %v", string(gqlResp.Data))
}
return gqlResp.Data, nil
}
Expand Down
8 changes: 8 additions & 0 deletions dgraphtest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type ClusterConfig struct {
uidLease int
// exposed port offset for grpc/http port for both alpha/zero
portOffset int
bulkOutDir string
}

func NewClusterConfig() ClusterConfig {
Expand Down Expand Up @@ -163,3 +164,10 @@ func (cc ClusterConfig) WithExposedPortOffset(offset uint64) ClusterConfig {
cc.portOffset = int(offset)
return cc
}

// WithBulkLoadOutDir sets the out dir for the bulk loader. This ensures
// that the same p directory is used while setting up alphas.
func (cc ClusterConfig) WithBulkLoadOutDir(dir string) ClusterConfig {
cc.bulkOutDir = dir
return cc
}
38 changes: 15 additions & 23 deletions dgraphtest/dgraph.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
)

const (
binaryName = "dgraph_%v"
binaryNameFmt = "dgraph_%v"
zeroNameFmt = "%v_zero%d"
zeroAliasNameFmt = "zero%d"
alphaNameFmt = "%v_alpha%d"
Expand All @@ -46,6 +46,7 @@ const (

alphaWorkingDir = "/data/alpha"
zeroWorkingDir = "/data/zero"
DefaultAlphaPDir = "/data/alpha/p"
DefaultBackupDir = "/data/backups"
DefaultExportDir = "/data/exports"

Expand Down Expand Up @@ -275,6 +276,19 @@ func (a *alpha) mounts(c *LocalCluster) ([]mount.Mount, error) {
})
}

if c.conf.bulkOutDir != "" {
pDir := filepath.Join(c.conf.bulkOutDir, strconv.Itoa(a.id/c.conf.replicas), "p")
if err := os.MkdirAll(pDir, os.ModePerm); err != nil {
return nil, errors.Wrap(err, "erorr creating bulk dir")
}
mounts = append(mounts, mount.Mount{
Type: mount.TypeBind,
Source: pDir,
Target: DefaultAlphaPDir,
ReadOnly: false,
})
}

for dir, vol := range c.conf.volumes {
mounts = append(mounts, mount.Mount{
Type: mount.TypeVolume,
Expand Down Expand Up @@ -333,28 +347,6 @@ func publicPort(dcli *docker.Client, dc dnode, privatePort string) (string, erro
}

func mountBinary(c *LocalCluster) (mount.Mount, error) {
if c.conf.version == localVersion {
return mount.Mount{
Type: mount.TypeBind,
Source: filepath.Join(os.Getenv("GOPATH"), "bin"),
Target: "/gobin",
ReadOnly: true,
}, nil
}

isFileExist, err := fileExists(filepath.Join(c.tempBinDir, "dgraph"))
if err != nil {
return mount.Mount{}, err
}
if isFileExist {
return mount.Mount{
Type: mount.TypeBind,
Source: c.tempBinDir,
Target: "/gobin",
ReadOnly: true,
}, nil
}

if err := c.setupBinary(); err != nil {
return mount.Mount{}, err
}
Expand Down
18 changes: 14 additions & 4 deletions dgraphtest/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@ func (c *LocalCluster) dgraphImage() string {
}

func (c *LocalCluster) setupBinary() error {
isFileThere, err := fileExists(filepath.Join(binDir, fmt.Sprintf(binaryName, c.conf.version)))
if c.conf.version == localVersion {
fromDir := filepath.Join(os.Getenv("GOPATH"), "bin")
return copyBinary(fromDir, c.tempBinDir, c.conf.version)
}

isFileThere, err := fileExists(filepath.Join(binDir, fmt.Sprintf(binaryNameFmt, c.conf.version)))
if err != nil {
return err
}
Expand Down Expand Up @@ -138,15 +143,20 @@ func buildDgraphBinary(dir, binaryDir, version string) error {
return errors.Wrapf(err, "error while building dgraph binary\noutput:%v", string(out))
}
if err := copy(filepath.Join(dir, "dgraph", "dgraph"),
filepath.Join(binaryDir, fmt.Sprintf(binaryName, version))); err != nil {
filepath.Join(binaryDir, fmt.Sprintf(binaryNameFmt, version))); err != nil {
return errors.Wrap(err, "error while copying binary")
}
return nil
}

func copyBinary(fromDir, toDir, version string) error {
if err := copy(filepath.Join(fromDir, fmt.Sprintf(binaryName, version)),
filepath.Join(toDir, "dgraph")); err != nil {
binaryName := "dgraph"
if version != localVersion {
binaryName = fmt.Sprintf(binaryNameFmt, version)
}
fromPath := filepath.Join(fromDir, binaryName)
toPath := filepath.Join(toDir, "dgraph")
if err := copy(fromPath, toPath); err != nil {
return errors.Wrap(err, "error while copying binary into tempBinDir")
}
return nil
Expand Down
51 changes: 48 additions & 3 deletions dgraphtest/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"

"github.com/pkg/errors"
Expand All @@ -34,7 +35,7 @@ import (
)

type LiveOpts struct {
RdfFiles []string
DataFiles []string
SchemaFiles []string
GqlSchemaFiles []string
}
Expand Down Expand Up @@ -140,7 +141,7 @@ func (c *LocalCluster) LiveLoad(opts LiveOpts) error {

args := []string{
"live",
"--files", strings.Join(opts.RdfFiles, ","),
"--files", strings.Join(opts.DataFiles, ","),
"--alpha", strings.Join(alphaURLs, ","),
"--zero", zeroURL,
}
Expand Down Expand Up @@ -227,7 +228,7 @@ func (c *LocalCluster) LiveLoadFromExport(exportDir string) error {
}

opts := LiveOpts{
RdfFiles: rdfFiles,
DataFiles: rdfFiles,
SchemaFiles: schemaFiles,
GqlSchemaFiles: gqlSchemaFiles,
}
Expand All @@ -236,3 +237,47 @@ func (c *LocalCluster) LiveLoadFromExport(exportDir string) error {
}
return nil
}

type BulkOpts struct {
DataFiles []string
SchemaFiles []string
GQLSchemaFiles []string
}

func (c *LocalCluster) BulkLoad(opts BulkOpts) error {
zeroURL, err := c.zeros[0].zeroURL(c)
if err != nil {
return errors.Wrap(err, "error finding URL of first zero")
}

shards := c.conf.numAlphas / c.conf.replicas
args := []string{"bulk",
"--store_xids=true",
"--zero", zeroURL,
"--reduce_shards", strconv.Itoa(shards),
"--map_shards", strconv.Itoa(shards),
"--out", c.conf.bulkOutDir,
// we had to create the dir for setting up docker, hence, replacing it here.
"--replace_out",
}

if len(opts.DataFiles) > 0 {
args = append(args, "-f", strings.Join(opts.DataFiles, ","))
}
if len(opts.SchemaFiles) > 0 {
args = append(args, "-s", strings.Join(opts.SchemaFiles, ","))
}
if len(opts.GQLSchemaFiles) > 0 {
args = append(args, "-g", strings.Join(opts.GQLSchemaFiles, ","))
}

log.Printf("[INFO] running bulk loader with args: [%v]", strings.Join(args, " "))
cmd := exec.Command(filepath.Join(c.tempBinDir, "dgraph"), args...)
if out, err := cmd.CombinedOutput(); err != nil {
return errors.Wrapf(err, "error running bulk loader: %v", string(out))
} else {
log.Printf("[INFO] ==== output for bulk loader ====")
log.Println(string(out))
return nil
}
}
8 changes: 6 additions & 2 deletions dgraphtest/local_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (c *LocalCluster) Start() error {
return err
}
}
return c.HealthCheck()
return c.HealthCheck(false)
}

func (c *LocalCluster) StartZero(id int) error {
Expand Down Expand Up @@ -343,7 +343,7 @@ func (c *LocalCluster) killContainer(dc dnode) error {
return nil
}

func (c *LocalCluster) HealthCheck() error {
func (c *LocalCluster) HealthCheck(zeroOnly bool) error {
log.Printf("[INFO] checking health of containers")
for i := 0; i < c.conf.numZeros; i++ {
url, err := c.zeros[i].healthURL(c)
Expand All @@ -355,6 +355,10 @@ func (c *LocalCluster) HealthCheck() error {
}
log.Printf("[INFO] container [zero-%v] passed health check", i)
}
if zeroOnly {
return nil
}

for i := 0; i < c.conf.numAlphas; i++ {
url, err := c.alphas[i].healthURL(c)
if err != nil {
Expand Down
Loading