Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add @unique constraint support in schema for new predicates #8827

Merged
merged 2 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dgraph/cmd/live/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type loader struct {
reqNum uint64
reqs chan *request
zeroconn *grpc.ClientConn
schema *schema
schema *Schema
namespaces map[uint64]struct{}

upsertLock sync.RWMutex
Expand Down Expand Up @@ -240,7 +240,7 @@ func createValueEdge(nq *api.NQuad, sid uint64) (*pb.DirectedEdge, error) {
return p, nil
}

func fingerprintEdge(t *pb.DirectedEdge, pred *predicate) uint64 {
func fingerprintEdge(t *pb.DirectedEdge, pred *Predicate) uint64 {
var id uint64 = math.MaxUint64

// Value with a lang type.
Expand Down
21 changes: 11 additions & 10 deletions dgraph/cmd/live/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type options struct {
preserveNs bool
}

type predicate struct {
type Predicate struct {
Predicate string `json:"predicate,omitempty"`
Type string `json:"type,omitempty"`
Tokenizer []string `json:"tokenizer,omitempty"`
Expand All @@ -89,21 +89,22 @@ type predicate struct {
Upsert bool `json:"upsert,omitempty"`
Reverse bool `json:"reverse,omitempty"`
NoConflict bool `json:"no_conflict,omitempty"`
Unique bool `json:"unique,omitempty"`
ValueType types.TypeID
}

type schema struct {
Predicates []*predicate `json:"schema,omitempty"`
preds map[string]*predicate
type Schema struct {
Predicates []*Predicate `json:"schema,omitempty"`
preds map[string]*Predicate
}

type request struct {
*api.Mutation
conflicts []uint64
}

func (l *schema) init(ns uint64, galaxyOperation bool) {
l.preds = make(map[string]*predicate)
func (l *Schema) init(ns uint64, galaxyOperation bool) {
l.preds = make(map[string]*Predicate)
for _, i := range l.Predicates {
i.ValueType, _ = types.TypeForName(i.Type)
if !galaxyOperation {
Expand All @@ -115,7 +116,7 @@ func (l *schema) init(ns uint64, galaxyOperation bool) {

var (
opt options
sch schema
sch Schema

// Live is the sub-command invoked when running "dgraph live".
Live x.SubCommand
Expand Down Expand Up @@ -185,7 +186,7 @@ func init() {
"specific namespace. Setting it to negative value will preserve the namespace.")
}

func getSchema(ctx context.Context, dgraphClient *dgo.Dgraph, galaxyOperation bool) (*schema, error) {
func getSchema(ctx context.Context, dgraphClient *dgo.Dgraph, galaxyOperation bool) (*Schema, error) {
txn := dgraphClient.NewTxn()
defer func() {
if err := txn.Discard(ctx); err != nil {
Expand Down Expand Up @@ -500,7 +501,7 @@ func (l *loader) processLoadFile(ctx context.Context, rd *bufio.Reader, ck chunk
sort.Slice(buffer, func(i, j int) bool {
iPred := sch.preds[x.NamespaceAttr(buffer[i].Namespace, buffer[i].Predicate)]
jPred := sch.preds[x.NamespaceAttr(buffer[j].Namespace, buffer[j].Predicate)]
t := func(a *predicate) int {
t := func(a *Predicate) int {
if a != nil && a.Count {
return 1
}
Expand Down Expand Up @@ -683,7 +684,7 @@ func (l *loader) populateNamespaces(ctx context.Context, dc *dgo.Dgraph, singleN
return err
}

var sch schema
var sch Schema
err = json.Unmarshal(res.GetJson(), &sch)
if err != nil {
return err
Expand Down
14 changes: 14 additions & 0 deletions dgraphtest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,20 @@ func (gc *GrpcClient) Mutate(mu *api.Mutation) (*api.Response, error) {
return txn.Mutate(ctx, mu)
}

func (gc *GrpcClient) Upsert(query string, mu *api.Mutation) (*api.Response, error) {
txn := gc.NewTxn()
defer func() { _ = txn.Discard(context.Background()) }()

ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
defer cancel()
req := &api.Request{
Query: query,
Mutations: []*api.Mutation{mu},
CommitNow: true,
}
return txn.Do(ctx, req)
}

// Query performa a given query in a new txn
func (gc *GrpcClient) Query(query string) (*api.Response, error) {
txn := gc.NewTxn()
Expand Down
Loading