diff --git a/worker/mutation.go b/worker/mutation.go index 002c2adc7a5..e7fecbd9c65 100644 --- a/worker/mutation.go +++ b/worker/mutation.go @@ -182,18 +182,15 @@ func runSchemaMutation(ctx context.Context, updates []*pb.SchemaUpdate, startTs var wg sync.WaitGroup wg.Add(1) defer wg.Done() - // This throttle allows is used to limit the number of files which are opened simultaneously - // by badger while building indexes for predicates in background. - maxOpenFileLimit, err := x.QueryMaxOpenFiles() - if err != nil { - // Setting to default value on unix systems - maxOpenFileLimit = 1024 - } - glog.Infof("Max open files limit: %d", maxOpenFileLimit) + // Badger opens around 8 files for indexing per predicate. - // The throttle limit is set to maxOpenFileLimit/8 to ensure that indexing does not throw - // "Too many open files" error. - throttle := y.NewThrottle(maxOpenFileLimit / 8) + // The throttle limit is set to 1024 to ensure that indexing + // does not throw "Too many open files" error. + maxPredicateAtATime := 1024 + if len(updates) < maxPredicateAtATime { + maxPredicateAtATime = len(updates) + } + throttle := y.NewThrottle(maxPredicateAtATime) buildIndexes := func(update *pb.SchemaUpdate, rebuild posting.IndexRebuild, c *z.Closer) { // In case background indexing is running, we should call it here again. @@ -237,6 +234,7 @@ func runSchemaMutation(ctx context.Context, updates []*pb.SchemaUpdate, startTs // Start opIndexing task only if schema update needs to build the indexes. if shouldRebuild && !gr.Node.isRunningTask(opIndexing) { + var err error closer, err = gr.Node.startTaskAtTs(opIndexing, startTs) if err != nil { return err