diff --git a/edgraph/multi_tenancy_ee.go b/edgraph/multi_tenancy_ee.go index 0f3bb5759f2..22abeada191 100644 --- a/edgraph/multi_tenancy_ee.go +++ b/edgraph/multi_tenancy_ee.go @@ -110,9 +110,6 @@ func (s *Server) CreateNamespace(ctx context.Context, passwd string) (uint64, er return 0, err } - if err = worker.WaitForIndexing(ctx, true); err != nil { - return 0, errors.Wrap(err, "Creating namespace, got error: ") - } err = x.RetryUntilSuccess(10, 100*time.Millisecond, func() error { return createGuardianAndGroot(ctx, ids.StartId, passwd) }) diff --git a/worker/draft.go b/worker/draft.go index fb65e3b545f..a851e2ddb2d 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -195,6 +195,13 @@ func (n *node) waitForTask(id op) { closer.Wait() } +func (n *node) isRunningTask(id op) bool { + n.opsLock.Lock() + _, ok := n.ops[id] + n.opsLock.Unlock() + return ok +} + func (n *node) stopAllTasks() { defer n.closer.Done() // CLOSER:1 <-n.closer.HasBeenClosed() diff --git a/worker/mutation.go b/worker/mutation.go index 584a166920a..ccafab98e6f 100644 --- a/worker/mutation.go +++ b/worker/mutation.go @@ -176,13 +176,6 @@ func runSchemaMutation(ctx context.Context, updates []*pb.SchemaUpdate, startTs } } - // Ensure that rollup is not running. - closer, err := gr.Node.startTask(opIndexing) - if err != nil { - return err - } - defer stopIndexing(closer) - buildIndexesHelper := func(update *pb.SchemaUpdate, rebuild posting.IndexRebuild) error { wrtCtx := schema.GetWriteContext(context.Background()) if err := rebuild.BuildIndexes(wrtCtx); err != nil { @@ -214,9 +207,9 @@ func runSchemaMutation(ctx context.Context, updates []*pb.SchemaUpdate, startTs // "Too many open files" error. throttle := y.NewThrottle(maxOpenFileLimit / 8) - buildIndexes := func(update *pb.SchemaUpdate, rebuild posting.IndexRebuild) { + buildIndexes := func(update *pb.SchemaUpdate, rebuild posting.IndexRebuild, c *z.Closer) { // In case background indexing is running, we should call it here again. - defer stopIndexing(closer) + defer stopIndexing(c) // We should only start building indexes once this function has returned. // This is in order to ensure that we do not call DropPrefix for one predicate @@ -233,6 +226,7 @@ func runSchemaMutation(ctx context.Context, updates []*pb.SchemaUpdate, startTs throttle.Done(nil) } + var closer *z.Closer for _, su := range updates { if tablet, err := groups().Tablet(su.Predicate); err != nil { return err @@ -251,6 +245,17 @@ func runSchemaMutation(ctx context.Context, updates []*pb.SchemaUpdate, startTs OldSchema: &old, CurrentSchema: su, } + shouldRebuild := ok && rebuild.NeedIndexRebuild() + + // Start opIndexing task only if schema update needs to build the indexes. + if shouldRebuild && !gr.Node.isRunningTask(opIndexing) { + closer, err = gr.Node.startTask(opIndexing) + if err != nil { + return err + } + defer stopIndexing(closer) + } + querySchema := rebuild.GetQuerySchema() // Sets the schema only in memory. The schema is written to // disk only after schema mutations are successful. @@ -273,8 +278,8 @@ func runSchemaMutation(ctx context.Context, updates []*pb.SchemaUpdate, startTs return err } - if ok && rebuild.NeedIndexRebuild() { - go buildIndexes(su, rebuild) + if shouldRebuild { + go buildIndexes(su, rebuild, closer) } else if err := updateSchema(su); err != nil { return err }