diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 74db13d968e15..1875cb1ca5423 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -667,6 +667,16 @@ func (rc *Client) GetFilesInRawRange(startKey []byte, endKey []byte, cf string) // SetConcurrency sets the concurrency of dbs tables files. func (rc *Client) SetConcurrency(c uint) { log.Info("download worker pool", zap.Uint("size", c)) + if rc.granularity == string(CoarseGrained) { + // we believe 32 is large enough for download worker pool. + // it won't reach the limit if sst files distribute evenly. + // when restore memory usage is still too high, we should reduce concurrencyPerStore + // to sarifice some speed to reduce memory usage. + count := uint(rc.storeCount) * rc.concurrencyPerStore * 32 + log.Info("download coarse worker pool", zap.Uint("size", count)) + rc.workerPool = utils.NewWorkerPool(count, "file") + return + } rc.workerPool = utils.NewWorkerPool(c, "file") } @@ -1544,8 +1554,8 @@ LOOPFORTABLE: // wait for download worker notified rc.fileImporter.cond.Wait() } - eg.Go(restoreFn) rc.fileImporter.cond.L.Unlock() + rc.workerPool.ApplyOnErrorGroup(eg, restoreFn) } else { // if we are not use coarse granularity which means // we still pipeline split & scatter regions and import sst files diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index b103562e427c4..2e18baafd9ce3 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -140,8 +140,8 @@ func (cfg *RestoreCommonConfig) adjust() { func DefineRestoreCommonFlags(flags *pflag.FlagSet) { // TODO remove experimental tag if it's stable flags.Bool(flagOnline, false, "(experimental) Whether online when restore") - flags.String(flagGranularity, string(restore.FineGrained), "(experimental) Whether split & scatter regions using fine-grained way during restore") - flags.Uint(flagConcurrencyPerStore, 128, "(experimental) The size of thread pool on each store that executes tasks") + flags.String(flagGranularity, string(restore.FineGrained), "Whether split & scatter regions using fine-grained way during restore") + flags.Uint(flagConcurrencyPerStore, 128, "The size of thread pool on each store that executes tasks, only enabled when `--granularity=coarse-grained`") flags.Uint32(flagConcurrency, 128, "(deprecated) The size of thread pool on BR that executes tasks, "+ "where each task restores one SST file to TiKV") flags.Uint64(FlagMergeRegionSizeBytes, conn.DefaultMergeRegionSizeBytes,