diff --git a/br/pkg/lightning/backend/backend.go b/br/pkg/lightning/backend/backend.go index e14684509c891..1b6211583222f 100644 --- a/br/pkg/lightning/backend/backend.go +++ b/br/pkg/lightning/backend/backend.go @@ -513,6 +513,6 @@ type EngineWriter interface { Close(ctx context.Context) (ChunkFlushStatus, error) } -func (engine *OpenedEngine) GetEngineUuid() uuid.UUID { +func (engine *OpenedEngine) GetEngineUUID() uuid.UUID { return engine.uuid } diff --git a/ddl/ingest/BUILD.bazel b/ddl/ingest/BUILD.bazel new file mode 100644 index 0000000000000..e75a8e1963344 --- /dev/null +++ b/ddl/ingest/BUILD.bazel @@ -0,0 +1,58 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "ingest", + srcs = [ + "backend.go", + "backend_mgr.go", + "config.go", + "disk_root.go", + "engine.go", + "engine_mgr.go", + "env.go", + "mem_root.go", + "message.go", + ], + importpath = "github.com/pingcap/tidb/ddl/ingest", + visibility = ["//visibility:public"], + deps = [ + "//br/pkg/lightning/backend", + "//br/pkg/lightning/backend/kv", + "//br/pkg/lightning/backend/local", + "//br/pkg/lightning/checkpoints", + "//br/pkg/lightning/common", + "//br/pkg/lightning/config", + "//br/pkg/lightning/errormanager", + "//br/pkg/lightning/glue", + "//br/pkg/lightning/log", + "//config", + "//kv", + "//parser", + "//parser/model", + "//parser/mysql", + "//sessionctx/variable", + "//table", + "//util/generic", + "//util/logutil", + "//util/mathutil", + "//util/size", + "@com_github_google_uuid//:uuid", + "@com_github_pingcap_errors//:errors", + "@com_github_pkg_errors//:errors", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "ingest_test", + srcs = [ + "env_test.go", + "mem_root_test.go", + ], + flaky = True, + deps = [ + ":ingest", + "//config", + "@com_github_stretchr_testify//require", + ], +) diff --git a/ddl/ingest/backend.go b/ddl/ingest/backend.go new file mode 100644 index 0000000000000..63034f0be3a22 --- /dev/null +++ b/ddl/ingest/backend.go @@ -0,0 +1,121 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ingest + +import ( + "context" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/br/pkg/lightning/backend" + "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" + "github.com/pingcap/tidb/br/pkg/lightning/config" + tikv "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" +) + +// BackendContext store a backend info for add index reorg task. +type BackendContext struct { + jobID int64 + backend *backend.Backend + ctx context.Context + cfg *config.Config + EngMgr engineManager + sysVars map[string]string + diskRoot DiskRoot + done bool +} + +// FinishImport imports all the key-values in engine into the storage, collects the duplicate errors if any, and +// removes the engine from the backend context. +func (bc *BackendContext) FinishImport(indexID int64, unique bool, tbl table.Table) error { + ei, exist := bc.EngMgr.Load(indexID) + if !exist { + return errors.New(LitErrGetEngineFail) + } + + err := ei.ImportAndClean() + if err != nil { + return err + } + + // Check remote duplicate value for the index. + if unique { + hasDupe, err := bc.backend.CollectRemoteDuplicateRows(bc.ctx, tbl, tbl.Meta().Name.L, &kv.SessionOptions{ + SQLMode: mysql.ModeStrictAllTables, + SysVars: bc.sysVars, + IndexID: ei.indexID, + }) + if err != nil { + logutil.BgLogger().Error(LitInfoRemoteDupCheck, zap.Error(err), + zap.String("table", tbl.Meta().Name.O), zap.Int64("index ID", indexID)) + return errors.New(LitInfoRemoteDupCheck) + } else if hasDupe { + logutil.BgLogger().Error(LitErrRemoteDupExistErr, + zap.String("table", tbl.Meta().Name.O), zap.Int64("index ID", indexID)) + return tikv.ErrKeyExists + } + } + return nil +} + +const importThreshold = 0.85 + +// Flush checks the disk quota and imports the current key-values in engine to the storage. +func (bc *BackendContext) Flush(indexID int64) error { + ei, exist := bc.EngMgr.Load(indexID) + if !exist { + logutil.BgLogger().Error(LitErrGetEngineFail, zap.Int64("index ID", indexID)) + return errors.New(LitErrGetEngineFail) + } + + err := bc.diskRoot.UpdateUsageAndQuota() + if err != nil { + logutil.BgLogger().Error(LitErrUpdateDiskStats, zap.Int64("index ID", indexID)) + return err + } + + if bc.diskRoot.CurrentUsage() >= uint64(importThreshold*float64(bc.diskRoot.MaxQuota())) { + // TODO: it should be changed according checkpoint solution. + // Flush writer cached data into local disk for engine first. + err := ei.Flush() + if err != nil { + return err + } + logutil.BgLogger().Info(LitInfoUnsafeImport, zap.Int64("index ID", indexID), + zap.Uint64("current disk usage", bc.diskRoot.CurrentUsage()), + zap.Uint64("max disk quota", bc.diskRoot.MaxQuota())) + err = bc.backend.UnsafeImportAndReset(bc.ctx, ei.uuid, int64(config.SplitRegionSize)*int64(config.MaxSplitRegionSizeRatio), int64(config.SplitRegionKeys)) + if err != nil { + logutil.BgLogger().Error(LitErrIngestDataErr, zap.Int64("index ID", indexID), + zap.Error(err), zap.Uint64("current disk usage", bc.diskRoot.CurrentUsage()), + zap.Uint64("max disk quota", bc.diskRoot.MaxQuota())) + return err + } + } + return nil +} + +// Done returns true if the lightning backfill is done. +func (bc *BackendContext) Done() bool { + return bc.done +} + +// SetDone sets the done flag. +func (bc *BackendContext) SetDone() { + bc.done = true +} diff --git a/ddl/ingest/backend_mgr.go b/ddl/ingest/backend_mgr.go new file mode 100644 index 0000000000000..14bb4fb3aa67a --- /dev/null +++ b/ddl/ingest/backend_mgr.go @@ -0,0 +1,193 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ingest + +import ( + "context" + "database/sql" + "fmt" + "math" + + "github.com/pingcap/tidb/br/pkg/lightning/backend" + "github.com/pingcap/tidb/br/pkg/lightning/backend/local" + "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" + "github.com/pingcap/tidb/br/pkg/lightning/config" + "github.com/pingcap/tidb/br/pkg/lightning/errormanager" + "github.com/pingcap/tidb/br/pkg/lightning/glue" + "github.com/pingcap/tidb/br/pkg/lightning/log" + "github.com/pingcap/tidb/parser" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/util/generic" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" +) + +type backendCtxManager struct { + generic.SyncMap[int64, *BackendContext] + memRoot MemRoot + diskRoot DiskRoot +} + +func (m *backendCtxManager) init(memRoot MemRoot, diskRoot DiskRoot) { + m.SyncMap = generic.NewSyncMap[int64, *BackendContext](10) + m.memRoot = memRoot + m.diskRoot = diskRoot +} + +// Register creates a new backend and registers it to the backend context. +func (m *backendCtxManager) Register(ctx context.Context, unique bool, jobID int64, _ mysql.SQLMode) (*BackendContext, error) { + bc, exist := m.Load(jobID) + if !exist { + m.memRoot.RefreshConsumption() + ok := m.memRoot.CheckConsume(StructSizeBackendCtx) + if !ok { + return nil, genBackendAllocMemFailedErr(m.memRoot, jobID) + } + cfg, err := generateLightningConfig(m.memRoot, jobID, unique) + if err != nil { + logutil.BgLogger().Warn(LitWarnConfigError, zap.Int64("job ID", jobID), zap.Error(err)) + return nil, err + } + bd, err := createLocalBackend(ctx, cfg, glueLit{}) + if err != nil { + logutil.BgLogger().Error(LitErrCreateBackendFail, zap.Int64("job ID", jobID), zap.Error(err)) + return nil, err + } + + bcCtx := newBackendContext(ctx, jobID, &bd, cfg, defaultImportantVariables, m.memRoot, m.diskRoot) + m.Store(jobID, bcCtx) + + m.memRoot.Consume(StructSizeBackendCtx) + logutil.BgLogger().Info(LitInfoCreateBackend, zap.Int64("job ID", jobID), + zap.Int64("current memory usage", m.memRoot.CurrentUsage()), + zap.Int64("max memory quota", m.memRoot.MaxMemoryQuota()), + zap.Bool("is unique index", unique)) + return bcCtx, nil + } + return bc, nil +} + +func createLocalBackend(ctx context.Context, cfg *config.Config, glue glue.Glue) (backend.Backend, error) { + tls, err := cfg.ToTLS() + if err != nil { + logutil.BgLogger().Error(LitErrCreateBackendFail, zap.Error(err)) + return backend.Backend{}, err + } + + errorMgr := errormanager.New(nil, cfg, log.Logger{Logger: logutil.BgLogger()}) + return local.NewLocalBackend(ctx, tls, cfg, glue, int(LitRLimit), errorMgr) +} + +func newBackendContext(ctx context.Context, jobID int64, be *backend.Backend, + cfg *config.Config, vars map[string]string, memRoot MemRoot, diskRoot DiskRoot) *BackendContext { + bc := &BackendContext{ + jobID: jobID, + backend: be, + ctx: ctx, + cfg: cfg, + sysVars: vars, + diskRoot: diskRoot, + } + bc.EngMgr.init(memRoot, diskRoot) + return bc +} + +// Unregister removes a backend context from the backend context manager. +func (m *backendCtxManager) Unregister(jobID int64) { + bc, exist := m.Load(jobID) + if !exist { + return + } + bc.EngMgr.UnregisterAll(jobID) + bc.backend.Close() + m.memRoot.Release(StructSizeBackendCtx) + m.Delete(jobID) + m.memRoot.ReleaseWithTag(encodeBackendTag(jobID)) + logutil.BgLogger().Info(LitInfoCloseBackend, zap.Int64("job ID", jobID), + zap.Int64("current memory usage", m.memRoot.CurrentUsage()), + zap.Int64("max memory quota", m.memRoot.MaxMemoryQuota())) +} + +// TotalDiskUsage returns the total disk usage of all backends. +func (m *backendCtxManager) TotalDiskUsage() uint64 { + var totalDiskUsed uint64 + for _, key := range m.Keys() { + bc, exists := m.Load(key) + if exists { + _, _, bcDiskUsed, _ := bc.backend.CheckDiskQuota(math.MaxInt64) + totalDiskUsed += uint64(bcDiskUsed) + } + } + return totalDiskUsed +} + +// UpdateMemoryUsage collects the memory usages from all the backend and updates it to the memRoot. +func (m *backendCtxManager) UpdateMemoryUsage() { + for _, key := range m.Keys() { + bc, exists := m.Load(key) + if exists { + curSize := bc.backend.TotalMemoryConsume() + m.memRoot.ReleaseWithTag(encodeBackendTag(bc.jobID)) + m.memRoot.ConsumeWithTag(encodeBackendTag(bc.jobID), curSize) + } + } +} + +// glueLit is used as a placeholder for the local backend initialization. +type glueLit struct{} + +// OwnsSQLExecutor Implement interface OwnsSQLExecutor. +func (glueLit) OwnsSQLExecutor() bool { + return false +} + +// GetSQLExecutor Implement interface GetSQLExecutor. +func (glueLit) GetSQLExecutor() glue.SQLExecutor { + return nil +} + +// GetDB Implement interface GetDB. +func (glueLit) GetDB() (*sql.DB, error) { + return nil, nil +} + +// GetParser Implement interface GetParser. +func (glueLit) GetParser() *parser.Parser { + return nil +} + +// GetTables Implement interface GetTables. +func (glueLit) GetTables(context.Context, string) ([]*model.TableInfo, error) { + return nil, nil +} + +// GetSession Implement interface GetSession. +func (glueLit) GetSession(context.Context) (checkpoints.Session, error) { + return nil, nil +} + +// OpenCheckpointsDB Implement interface OpenCheckpointsDB. +func (glueLit) OpenCheckpointsDB(context.Context, *config.Config) (checkpoints.DB, error) { + return nil, nil +} + +// Record is used to report some information (key, value) to host TiDB, including progress, stage currently. +func (glueLit) Record(string, uint64) { +} + +func encodeBackendTag(jobID int64) string { + return fmt.Sprintf("%d", jobID) +} diff --git a/ddl/ingest/config.go b/ddl/ingest/config.go new file mode 100644 index 0000000000000..3a96e8ae5201b --- /dev/null +++ b/ddl/ingest/config.go @@ -0,0 +1,137 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ingest + +import ( + "path/filepath" + + "github.com/pingcap/tidb/br/pkg/lightning/backend" + "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" + "github.com/pingcap/tidb/br/pkg/lightning/config" + tidbconf "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/size" + "go.uber.org/zap" +) + +func generateLightningConfig(memRoot MemRoot, jobID int64, unique bool) (*config.Config, error) { + tidbCfg := tidbconf.GetGlobalConfig() + cfg := config.NewConfig() + cfg.TikvImporter.Backend = config.BackendLocal + // Each backend will build a single dir in lightning dir. + cfg.TikvImporter.SortedKVDir = filepath.Join(LitSortPath, encodeBackendTag(jobID)) + _, err := cfg.AdjustCommon() + if err != nil { + logutil.BgLogger().Warn(LitWarnConfigError, zap.Error(err)) + return nil, err + } + adjustImportMemory(memRoot, cfg) + cfg.Checkpoint.Enable = true + if unique { + cfg.TikvImporter.DuplicateResolution = config.DupeResAlgRecord + } else { + cfg.TikvImporter.DuplicateResolution = config.DupeResAlgNone + } + cfg.TiDB.PdAddr = tidbCfg.Path + cfg.TiDB.Host = "127.0.0.1" + cfg.TiDB.StatusPort = int(tidbCfg.Status.StatusPort) + // Set TLS related information + cfg.Security.CAPath = tidbCfg.Security.ClusterSSLCA + cfg.Security.CertPath = tidbCfg.Security.ClusterSSLCert + cfg.Security.KeyPath = tidbCfg.Security.ClusterSSLKey + + return cfg, err +} + +var ( + compactMemory = 1 * size.GB + compactConcurrency = 4 +) + +func generateLocalEngineConfig(id int64, dbName, tbName string) *backend.EngineConfig { + return &backend.EngineConfig{ + Local: &backend.LocalEngineConfig{ + Compact: true, + CompactThreshold: int64(compactMemory), + CompactConcurrency: compactConcurrency, + }, + TableInfo: &checkpoints.TidbTableInfo{ + ID: id, + DB: dbName, + Name: tbName, + }, + } +} + +// adjustImportMemory adjusts the lightning memory parameters according to the memory root's max limitation. +func adjustImportMemory(memRoot MemRoot, cfg *config.Config) { + var scale int64 + // Try aggressive resource usage successful. + if tryAggressiveMemory(memRoot, cfg) { + return + } + + defaultMemSize := int64(cfg.TikvImporter.LocalWriterMemCacheSize) * int64(cfg.TikvImporter.RangeConcurrency) + defaultMemSize += 4 * int64(cfg.TikvImporter.EngineMemCacheSize) + logutil.BgLogger().Info(LitInfoInitMemSetting, + zap.Int64("local writer memory cache size", int64(cfg.TikvImporter.LocalWriterMemCacheSize)), + zap.Int64("engine memory cache size", int64(cfg.TikvImporter.EngineMemCacheSize)), + zap.Int("range concurrency", cfg.TikvImporter.RangeConcurrency)) + + maxLimit := memRoot.MaxMemoryQuota() + scale = defaultMemSize / maxLimit + + if scale == 1 || scale == 0 { + return + } + + cfg.TikvImporter.LocalWriterMemCacheSize /= config.ByteSize(scale) + cfg.TikvImporter.EngineMemCacheSize /= config.ByteSize(scale) + // TODO: adjust range concurrency number to control total concurrency in the future. + logutil.BgLogger().Info(LitInfoChgMemSetting, + zap.Int64("local writer memory cache size", int64(cfg.TikvImporter.LocalWriterMemCacheSize)), + zap.Int64("engine memory cache size", int64(cfg.TikvImporter.EngineMemCacheSize)), + zap.Int("range concurrency", cfg.TikvImporter.RangeConcurrency)) +} + +// tryAggressiveMemory lightning memory parameters according memory root's max limitation. +func tryAggressiveMemory(memRoot MemRoot, cfg *config.Config) bool { + var defaultMemSize int64 + defaultMemSize = int64(int(cfg.TikvImporter.LocalWriterMemCacheSize) * cfg.TikvImporter.RangeConcurrency) + defaultMemSize += int64(cfg.TikvImporter.EngineMemCacheSize) + + if (defaultMemSize + memRoot.CurrentUsage()) > memRoot.MaxMemoryQuota() { + return false + } + logutil.BgLogger().Info(LitInfoChgMemSetting, + zap.Int64("local writer memory cache size", int64(cfg.TikvImporter.LocalWriterMemCacheSize)), + zap.Int64("engine memory cache size", int64(cfg.TikvImporter.EngineMemCacheSize)), + zap.Int("range concurrency", cfg.TikvImporter.RangeConcurrency)) + return true +} + +// defaultImportantVariables is used in obtainImportantVariables to retrieve the system +// variables from downstream which may affect KV encode result. The values record the default +// values if missing. +var defaultImportantVariables = map[string]string{ + "max_allowed_packet": "67108864", // 64MB + "div_precision_increment": "4", + "time_zone": "SYSTEM", + "lc_time_names": "en_US", + "default_week_format": "0", + "block_encryption_mode": "aes-128-ecb", + "group_concat_max_len": "1024", + "tidb_row_format_version": "1", +} diff --git a/ddl/ingest/disk_root.go b/ddl/ingest/disk_root.go new file mode 100644 index 0000000000000..c1c98f3fe681a --- /dev/null +++ b/ddl/ingest/disk_root.go @@ -0,0 +1,71 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ingest + +import ( + "github.com/pingcap/errors" + lcom "github.com/pingcap/tidb/br/pkg/lightning/common" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/mathutil" + "go.uber.org/zap" +) + +// DiskRoot is used to track the disk usage for the lightning backfill process. +type DiskRoot interface { + CurrentUsage() uint64 + MaxQuota() uint64 + UpdateUsageAndQuota() error +} + +const capacityThreshold = 0.9 + +// diskRootImpl implements DiskRoot interface. +type diskRootImpl struct { + path string + currentUsage uint64 + maxQuota uint64 + bcCtx *backendCtxManager +} + +// NewDiskRootImpl creates a new DiskRoot. +func NewDiskRootImpl(path string, bcCtx *backendCtxManager) DiskRoot { + return &diskRootImpl{ + path: path, + bcCtx: bcCtx, + } +} + +// CurrentUsage implements DiskRoot interface. +func (d *diskRootImpl) CurrentUsage() uint64 { + return d.currentUsage +} + +// MaxQuota implements DiskRoot interface. +func (d *diskRootImpl) MaxQuota() uint64 { + return d.maxQuota +} + +// UpdateUsageAndQuota implements DiskRoot interface. +func (d *diskRootImpl) UpdateUsageAndQuota() error { + d.currentUsage = d.bcCtx.TotalDiskUsage() + sz, err := lcom.GetStorageSize(d.path) + if err != nil { + logutil.BgLogger().Error(LitErrGetStorageQuota, zap.Error(err)) + return errors.New(LitErrGetStorageQuota) + } + d.maxQuota = mathutil.Min(variable.DDLDiskQuota.Load(), uint64(capacityThreshold*float64(sz.Capacity))) + return nil +} diff --git a/ddl/ingest/engine.go b/ddl/ingest/engine.go new file mode 100644 index 0000000000000..545bdb87d3ff2 --- /dev/null +++ b/ddl/ingest/engine.go @@ -0,0 +1,184 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ingest + +import ( + "context" + "strconv" + + "github.com/google/uuid" + "github.com/pingcap/tidb/br/pkg/lightning/backend" + "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" + "github.com/pingcap/tidb/br/pkg/lightning/common" + "github.com/pingcap/tidb/br/pkg/lightning/config" + "github.com/pingcap/tidb/util/generic" + "github.com/pingcap/tidb/util/logutil" + "github.com/pkg/errors" + "go.uber.org/zap" +) + +// One engine for one index reorg task, each task will create several new writers under the +// Opened Engine. Note engineInfo is not thread safe. +type engineInfo struct { + ctx context.Context + jobID int64 + indexID int64 + openedEngine *backend.OpenedEngine + uuid uuid.UUID + cfg *backend.EngineConfig + writerCount int + writerCache generic.SyncMap[int, *backend.LocalEngineWriter] + memRoot MemRoot + diskRoot DiskRoot +} + +// NewEngineInfo create a new EngineInfo struct. +func NewEngineInfo(ctx context.Context, jobID, indexID int64, cfg *backend.EngineConfig, + en *backend.OpenedEngine, uuid uuid.UUID, wCnt int, memRoot MemRoot, diskRoot DiskRoot) *engineInfo { + return &engineInfo{ + ctx: ctx, + jobID: jobID, + indexID: indexID, + cfg: cfg, + openedEngine: en, + uuid: uuid, + writerCount: wCnt, + writerCache: generic.NewSyncMap[int, *backend.LocalEngineWriter](wCnt), + memRoot: memRoot, + diskRoot: diskRoot, + } +} + +// Flush imports all the key-values in engine to the storage. +func (ei *engineInfo) Flush() error { + err := ei.openedEngine.Flush(ei.ctx) + if err != nil { + logutil.BgLogger().Error(LitErrFlushEngineErr, zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) + return err + } + return nil +} + +func (ei *engineInfo) Clean() { + if ei.openedEngine == nil { + return + } + indexEngine := ei.openedEngine + closedEngine, err := indexEngine.Close(ei.ctx, ei.cfg) + if err != nil { + logutil.BgLogger().Error(LitErrCloseEngineErr, zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) + } + ei.openedEngine = nil + // Here the local intermediate files will be removed. + err = closedEngine.Cleanup(ei.ctx) + if err != nil { + logutil.BgLogger().Error(LitErrCleanEngineErr, zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) + } +} + +func (ei *engineInfo) ImportAndClean() error { + // Close engine and finish local tasks of lightning. + logutil.BgLogger().Info(LitInfoCloseEngine, zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) + indexEngine := ei.openedEngine + closeEngine, err1 := indexEngine.Close(ei.ctx, ei.cfg) + if err1 != nil { + logutil.BgLogger().Error(LitErrCloseEngineErr, zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) + return errors.New(LitErrCloseEngineErr) + } + ei.openedEngine = nil + + err := ei.diskRoot.UpdateUsageAndQuota() + if err != nil { + logutil.BgLogger().Error(LitErrUpdateDiskStats, zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) + return err + } + + // Ingest data to TiKV. + logutil.BgLogger().Info(LitInfoStartImport, zap.Int64("job ID", ei.jobID), + zap.Int64("index ID", ei.indexID), + zap.String("split region size", strconv.FormatInt(int64(config.SplitRegionSize), 10))) + err = closeEngine.Import(ei.ctx, int64(config.SplitRegionSize), int64(config.SplitRegionKeys)) + if err != nil { + logutil.BgLogger().Error(LitErrIngestDataErr, zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) + return errors.New(LitErrIngestDataErr) + } + + // Clean up the engine local workspace. + err = closeEngine.Cleanup(ei.ctx) + if err != nil { + logutil.BgLogger().Error(LitErrCloseEngineErr, zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) + return errors.New(LitErrCloseEngineErr) + } + return nil +} + +// WriterContext is used to keep a lightning local writer for each backfill worker. +type WriterContext struct { + ctx context.Context + lWrite *backend.LocalEngineWriter +} + +func (ei *engineInfo) NewWriterCtx(id int) (*WriterContext, error) { + ei.memRoot.RefreshConsumption() + ok := ei.memRoot.CheckConsume(StructSizeWriterCtx) + if !ok { + return nil, genEngineAllocMemFailedErr(ei.memRoot, ei.jobID, ei.indexID) + } + + wCtx, err := ei.newWriterContext(id) + if err != nil { + logutil.BgLogger().Error(LitErrCreateContextFail, zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID), + zap.Int("worker ID", id)) + return nil, err + } + + ei.memRoot.Consume(StructSizeWriterCtx) + logutil.BgLogger().Info(LitInfoCreateWrite, zap.Int64("job ID", ei.jobID), + zap.Int64("index ID", ei.indexID), zap.Int("worker ID", id), + zap.Int64("allocate memory", StructSizeWriterCtx), + zap.Int64("current memory usage", ei.memRoot.CurrentUsage()), + zap.Int64("max memory quota", ei.memRoot.MaxMemoryQuota())) + return wCtx, err +} + +// newWriterContext will get worker local writer from engine info writer cache first, if exists. +// If local writer not exist, then create new one and store it into engine info writer cache. +// note: operate ei.writeCache map is not thread safe please make sure there is sync mechanism to +// make sure the safe. +func (ei *engineInfo) newWriterContext(workerID int) (*WriterContext, error) { + lWrite, exist := ei.writerCache.Load(workerID) + if !exist { + var err error + lWrite, err = ei.openedEngine.LocalWriter(ei.ctx, &backend.LocalWriterConfig{}) + if err != nil { + return nil, err + } + // Cache the local writer. + ei.writerCache.Store(workerID, lWrite) + } + return &WriterContext{ + ctx: ei.ctx, + lWrite: lWrite, + }, nil +} + +// WriteRow Write one row into local writer buffer. +func (wCtx *WriterContext) WriteRow(key, idxVal []byte) error { + kvs := make([]common.KvPair, 1) + kvs[0].Key = key + kvs[0].Val = idxVal + row := kv.MakeRowsFromKvPairs(kvs) + return wCtx.lWrite.WriteRows(wCtx.ctx, nil, row) +} diff --git a/ddl/ingest/engine_mgr.go b/ddl/ingest/engine_mgr.go new file mode 100644 index 0000000000000..46272d7b852b1 --- /dev/null +++ b/ddl/ingest/engine_mgr.go @@ -0,0 +1,108 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ingest + +import ( + "fmt" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/util/generic" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" +) + +type engineManager struct { + generic.SyncMap[int64, *engineInfo] + MemRoot MemRoot + DiskRoot DiskRoot +} + +func (m *engineManager) init(memRoot MemRoot, diskRoot DiskRoot) { + m.SyncMap = generic.NewSyncMap[int64, *engineInfo](10) + m.MemRoot = memRoot + m.DiskRoot = diskRoot +} + +// Register create a new engineInfo and register it to the engineManager. +func (m *engineManager) Register(bc *BackendContext, job *model.Job, indexID int64) error { + // Calculate lightning concurrency degree and set memory usage + // and pre-allocate memory usage for worker. + m.MemRoot.RefreshConsumption() + ok := m.MemRoot.CheckConsume(int64(bc.cfg.TikvImporter.LocalWriterMemCacheSize)) + if !ok { + return genEngineAllocMemFailedErr(m.MemRoot, bc.jobID, indexID) + } + + en, exist := m.Load(indexID) + if !exist { + engineCacheSize := int64(bc.cfg.TikvImporter.EngineMemCacheSize) + ok := m.MemRoot.CheckConsume(StructSizeEngineInfo + engineCacheSize) + if !ok { + return genEngineAllocMemFailedErr(m.MemRoot, bc.jobID, indexID) + } + + cfg := generateLocalEngineConfig(job.ID, job.SchemaName, job.TableName) + openedEn, err := bc.backend.OpenEngine(bc.ctx, cfg, job.TableName, int32(indexID)) + if err != nil { + return errors.New(LitErrCreateEngineFail) + } + id := openedEn.GetEngineUUID() + en = NewEngineInfo(bc.ctx, job.ID, indexID, cfg, openedEn, id, 1, m.MemRoot, m.DiskRoot) + m.Store(indexID, en) + m.MemRoot.Consume(StructSizeEngineInfo) + m.MemRoot.ConsumeWithTag(encodeEngineTag(job.ID, indexID), engineCacheSize) + } else { + if en.writerCount+1 > bc.cfg.TikvImporter.RangeConcurrency { + logutil.BgLogger().Warn(LitErrExceedConcurrency, zap.Int64("job ID", job.ID), + zap.Int64("index ID", indexID), + zap.Int("concurrency", bc.cfg.TikvImporter.RangeConcurrency)) + return errors.New(LitErrExceedConcurrency) + } + en.writerCount++ + } + m.MemRoot.ConsumeWithTag(encodeEngineTag(job.ID, indexID), int64(bc.cfg.TikvImporter.LocalWriterMemCacheSize)) + logutil.BgLogger().Info(LitInfoOpenEngine, zap.Int64("job ID", job.ID), + zap.Int64("index ID", indexID), + zap.Int64("current memory usage", m.MemRoot.CurrentUsage()), + zap.Int64("memory limitation", m.MemRoot.MaxMemoryQuota()), + zap.Int("current writer count", en.writerCount)) + return nil +} + +// Unregister delete the engineInfo from the engineManager. +func (m *engineManager) Unregister(jobID, indexID int64) { + ei, exist := m.Load(indexID) + if !exist { + return + } + + ei.Clean() + m.Delete(indexID) + m.MemRoot.ReleaseWithTag(encodeEngineTag(jobID, indexID)) + m.MemRoot.Release(StructSizeWriterCtx * int64(ei.writerCount)) + m.MemRoot.Release(StructSizeEngineInfo) +} + +// UnregisterAll delete all engineInfo from the engineManager. +func (m *engineManager) UnregisterAll(jobID int64) { + for _, idxID := range m.Keys() { + m.Unregister(jobID, idxID) + } +} + +func encodeEngineTag(jobID, indexID int64) string { + return fmt.Sprintf("%d-%d", jobID, indexID) +} diff --git a/ddl/ingest/env.go b/ddl/ingest/env.go new file mode 100644 index 0000000000000..5034b94168e0e --- /dev/null +++ b/ddl/ingest/env.go @@ -0,0 +1,121 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ingest + +import ( + "os" + "path/filepath" + "strconv" + "syscall" + + "github.com/pingcap/tidb/br/pkg/lightning/log" + "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/size" + "go.uber.org/zap" +) + +var ( + // LitBackCtxMgr is the entry for the lightning backfill process. + LitBackCtxMgr backendCtxManager + // LitMemRoot is used to track the memory usage of the lightning backfill process. + LitMemRoot MemRoot + // LitDiskRoot is used to track the disk usage of the lightning backfill process. + LitDiskRoot DiskRoot + // LitRLimit is the max open file number of the lightning backfill process. + LitRLimit uint64 + // LitSortPath is the sort path for the lightning backfill process. + LitSortPath string + // LitInitialized is the flag indicates whether the lightning backfill process is initialized. + LitInitialized bool +) + +const maxMemoryQuota = 2 * size.GB + +// InitGlobalLightningEnv initialize Lightning backfill environment. +func InitGlobalLightningEnv() { + log.SetAppLogger(logutil.BgLogger()) + sPath, err := genLightningDataDir() + if err != nil { + logutil.BgLogger().Warn(LitWarnEnvInitFail, zap.Error(err), + zap.Bool("lightning is initialized", LitInitialized)) + return + } + LitSortPath = sPath + LitMemRoot = NewMemRootImpl(int64(maxMemoryQuota), &LitBackCtxMgr) + LitDiskRoot = NewDiskRootImpl(LitSortPath, &LitBackCtxMgr) + err = LitDiskRoot.UpdateUsageAndQuota() + if err != nil { + logutil.BgLogger().Warn(LitErrUpdateDiskStats, zap.Error(err), + zap.Bool("lightning is initialized", LitInitialized)) + return + } + LitBackCtxMgr.init(LitMemRoot, LitDiskRoot) + LitRLimit = genRLimit() + LitInitialized = true + logutil.BgLogger().Info(LitInfoEnvInitSucc, + zap.Uint64("memory limitation", maxMemoryQuota), + zap.Uint64("sort path disk quota", LitDiskRoot.MaxQuota()), + zap.Uint64("max open file number", LitRLimit), + zap.Bool("lightning is initialized", LitInitialized)) +} + +func genRLimit() uint64 { + rLimit := uint64(1024) + var rl syscall.Rlimit + err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rl) + if err != nil { + logutil.BgLogger().Warn(LitErrGetSysLimitErr, zap.Error(err), zap.String("default", "1024")) + } else { + rLimit = rl.Cur + } + return rLimit +} + +// Generate lightning local store dir in TiDB data dir. +// it will append -port to be tmp_ddl suffix. +func genLightningDataDir() (string, error) { + tidbCfg := config.GetGlobalConfig() + sortPathSuffix := "/tmp_ddl-" + strconv.Itoa(int(tidbCfg.Port)) + sortPath := filepath.Join(tidbCfg.TempDir, sortPathSuffix) + + if info, err := os.Stat(sortPath); err != nil { + if !os.IsNotExist(err) { + logutil.BgLogger().Error(LitErrStatDirFail, zap.String("sort path", sortPath), zap.Error(err)) + return "", err + } + } else if info.IsDir() { + // Currently remove all dir to clean garbage data. + // TODO: when do checkpoint should change follow logic. + err := os.RemoveAll(sortPath) + if err != nil { + logutil.BgLogger().Error(LitErrDeleteDirFail, zap.String("sort path", sortPath), zap.Error(err)) + } + } + + err := os.MkdirAll(sortPath, 0o700) + if err != nil { + logutil.BgLogger().Error(LitErrCreateDirFail, zap.String("sort path", sortPath), zap.Error(err)) + return "", err + } + logutil.BgLogger().Info(LitInfoSortDir, zap.String("data path:", sortPath)) + return sortPath, nil +} + +// GenRLimitForTest is only used for test. +var GenRLimitForTest = genRLimit + +// GenLightningDataDirForTest is only used for test. +var GenLightningDataDirForTest = genLightningDataDir diff --git a/ddl/ingest/env_test.go b/ddl/ingest/env_test.go new file mode 100644 index 0000000000000..90d37ea5c3101 --- /dev/null +++ b/ddl/ingest/env_test.go @@ -0,0 +1,35 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ingest_test + +import ( + "testing" + + "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/ddl/ingest" + "github.com/stretchr/testify/require" +) + +func TestGenLightningDataDir(t *testing.T) { + tmpDir := t.TempDir() + port, iPort := "5678", uint(5678) + config.UpdateGlobal(func(conf *config.Config) { + conf.TempDir = tmpDir + conf.Port = iPort + }) + sPath, err := ingest.GenLightningDataDirForTest() + require.NoError(t, err) + require.Equal(t, tmpDir+"/tmp_ddl-"+port, sPath) +} diff --git a/ddl/lightning/mem_root.go b/ddl/ingest/mem_root.go similarity index 68% rename from ddl/lightning/mem_root.go rename to ddl/ingest/mem_root.go index 946b429852529..a36d934c0abcd 100644 --- a/ddl/lightning/mem_root.go +++ b/ddl/ingest/mem_root.go @@ -12,17 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -package lightning +package ingest import ( "sync" + "unsafe" ) // MemRoot is used to track the memory usage for the lightning backfill process. type MemRoot interface { Consume(size int64) Release(size int64) - TestConsume(size int64) bool + CheckConsume(size int64) bool ConsumeWithTag(tag string, size int64) ReleaseWithTag(tag string) @@ -30,22 +31,40 @@ type MemRoot interface { MaxMemoryQuota() int64 CurrentUsage() int64 CurrentUsageWithTag(tag string) int64 + RefreshConsumption() +} + +var ( + // StructSizeBackendCtx is the size of BackendContext. + StructSizeBackendCtx int64 + // StructSizeEngineInfo is the size of EngineInfo. + StructSizeEngineInfo int64 + // StructSizeWriterCtx is the size of WriterContext. + StructSizeWriterCtx int64 +) + +func init() { + StructSizeBackendCtx = int64(unsafe.Sizeof(BackendContext{})) + StructSizeEngineInfo = int64(unsafe.Sizeof(engineInfo{})) + StructSizeWriterCtx = int64(unsafe.Sizeof(WriterContext{})) } // memRootImpl is an implementation of MemRoot. type memRootImpl struct { - maxLimit int64 - currUsage int64 - structSize map[string]int64 - mu sync.RWMutex + maxLimit int64 + currUsage int64 + structSize map[string]int64 + backendCtxMgr *backendCtxManager + mu sync.RWMutex } // NewMemRootImpl creates a new memRootImpl. -func NewMemRootImpl(maxQuota int64) *memRootImpl { +func NewMemRootImpl(maxQuota int64, bcCtxMgr *backendCtxManager) *memRootImpl { return &memRootImpl{ - maxLimit: maxQuota, - currUsage: 0, - structSize: make(map[string]int64, 10), + maxLimit: maxQuota, + currUsage: 0, + structSize: make(map[string]int64, 10), + backendCtxMgr: bcCtxMgr, } } @@ -96,11 +115,15 @@ func (m *memRootImpl) ConsumeWithTag(tag string, size int64) { m.mu.Lock() defer m.mu.Unlock() m.currUsage += size + if s, ok := m.structSize[tag]; ok { + m.structSize[tag] = s + size + return + } m.structSize[tag] = size } // TestConsume implements MemRoot. -func (m *memRootImpl) TestConsume(size int64) bool { +func (m *memRootImpl) CheckConsume(size int64) bool { m.mu.RLock() defer m.mu.RUnlock() return m.currUsage+size <= m.maxLimit @@ -113,3 +136,8 @@ func (m *memRootImpl) ReleaseWithTag(tag string) { m.currUsage -= m.structSize[tag] delete(m.structSize, tag) } + +// RefreshConsumption implements MemRoot. +func (m *memRootImpl) RefreshConsumption() { + m.backendCtxMgr.UpdateMemoryUsage() +} diff --git a/ddl/lightning/mem_root_test.go b/ddl/ingest/mem_root_test.go similarity index 76% rename from ddl/lightning/mem_root_test.go rename to ddl/ingest/mem_root_test.go index 08353489ac504..43bc36d13455e 100644 --- a/ddl/lightning/mem_root_test.go +++ b/ddl/ingest/mem_root_test.go @@ -12,35 +12,35 @@ // See the License for the specific language governing permissions and // limitations under the License. -package lightning_test +package ingest_test import ( "testing" - "github.com/pingcap/tidb/ddl/lightning" + "github.com/pingcap/tidb/ddl/ingest" "github.com/stretchr/testify/require" ) func TestMemoryRoot(t *testing.T) { - memRoot := lightning.MemRoot(lightning.NewMemRootImpl(1024)) + memRoot := ingest.MemRoot(ingest.NewMemRootImpl(1024, nil)) require.Equal(t, int64(1024), memRoot.MaxMemoryQuota()) require.Equal(t, int64(0), memRoot.CurrentUsage()) - require.True(t, memRoot.TestConsume(1023)) - require.True(t, memRoot.TestConsume(1024)) - require.False(t, memRoot.TestConsume(1025)) + require.True(t, memRoot.CheckConsume(1023)) + require.True(t, memRoot.CheckConsume(1024)) + require.False(t, memRoot.CheckConsume(1025)) memRoot.Consume(512) require.Equal(t, int64(512), memRoot.CurrentUsage()) - require.True(t, memRoot.TestConsume(512)) - require.False(t, memRoot.TestConsume(513)) + require.True(t, memRoot.CheckConsume(512)) + require.False(t, memRoot.CheckConsume(513)) require.Equal(t, int64(1024), memRoot.MaxMemoryQuota()) memRoot.Release(10) require.Equal(t, int64(502), memRoot.CurrentUsage()) require.Equal(t, int64(1024), memRoot.MaxMemoryQuota()) memRoot.SetMaxMemoryQuota(512) - require.False(t, memRoot.TestConsume(20)) // 502+20 > 512 + require.False(t, memRoot.CheckConsume(20)) // 502+20 > 512 memRoot.Release(502) require.Equal(t, int64(0), memRoot.CurrentUsage()) @@ -48,13 +48,13 @@ func TestMemoryRoot(t *testing.T) { memRoot.ConsumeWithTag("a", 512) memRoot.ConsumeWithTag("b", 512) require.Equal(t, int64(1024), memRoot.CurrentUsage()) - require.False(t, memRoot.TestConsume(1)) + require.False(t, memRoot.CheckConsume(1)) memRoot.ReleaseWithTag("a") require.Equal(t, int64(512), memRoot.CurrentUsage()) memRoot.ReleaseWithTag("a") // Double release. require.Equal(t, int64(512), memRoot.CurrentUsage()) - require.True(t, memRoot.TestConsume(10)) + require.True(t, memRoot.CheckConsume(10)) memRoot.Consume(10) // Mix usage of tag and non-tag. require.Equal(t, int64(522), memRoot.CurrentUsage()) } diff --git a/ddl/ingest/message.go b/ddl/ingest/message.go new file mode 100644 index 0000000000000..45b6a1c000eb8 --- /dev/null +++ b/ddl/ingest/message.go @@ -0,0 +1,74 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ingest + +import ( + "github.com/pingcap/errors" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" +) + +// Message const text +const ( + LitErrAllocMemFail string = "[ddl-lightning] allocate memory failed" + LitErrOutMaxMem string = "[ddl-lightning] memory used up for lightning add index" + LitErrCreateDirFail string = "[ddl-lightning] create lightning sort path error" + LitErrStatDirFail string = "[ddl-lightning] stat lightning sort path error" + LitErrDeleteDirFail string = "[ddl-lightning] delete lightning sort path error" + LitErrCreateBackendFail string = "[ddl-lightning] build lightning backend failed, will use kernel index reorg method to backfill the index" + LitErrCreateEngineFail string = "[ddl-lightning] build lightning engine failed, will use kernel index reorg method to backfill the index" + LitErrCreateContextFail string = "[ddl-lightning] build lightning worker context failed, will use kernel index reorg method to backfill the index" + LitErrGetEngineFail string = "[ddl-lightning] can not get cached engine info" + LitErrGetStorageQuota string = "[ddl-lightning] get storage quota error" + LitErrGetSysLimitErr string = "[ddl-lightning] get system open file limit error" + LitErrCloseEngineErr string = "[ddl-lightning] close engine error" + LitErrCleanEngineErr string = "[ddl-lightning] clean engine error" + LitErrFlushEngineErr string = "[ddl-lightning] flush engine data err" + LitErrIngestDataErr string = "[ddl-lightning] ingest data into storage error" + LitErrRemoteDupExistErr string = "[ddl-lightning] remote duplicate index key exist" + LitErrExceedConcurrency string = "[ddl-lightning] the concurrency is greater than lightning limit(tikv-importer.range-concurrency)" + LitErrUpdateDiskStats string = "[ddl-lightning] update disk usage error" + LitWarnEnvInitFail string = "[ddl-lightning] initialize environment failed" + LitWarnConfigError string = "[ddl-lightning] build config for backend failed" + LitWarnGenMemLimit string = "[ddl-lightning] generate memory max limitation" + LitInfoEnvInitSucc string = "[ddl-lightning] init global lightning backend environment finished" + LitInfoSortDir string = "[ddl-lightning] the lightning sorted dir" + LitInfoCreateBackend string = "[ddl-lightning] create one backend for an DDL job" + LitInfoCloseBackend string = "[ddl-lightning] close one backend for DDL job" + LitInfoOpenEngine string = "[ddl-lightning] open an engine for index reorg task" + LitInfoCreateWrite string = "[ddl-lightning] create one local Writer for Index reorg task" + LitInfoCloseEngine string = "[ddl-lightning] flush all writer and get closed engine" + LitInfoRemoteDupCheck string = "[ddl-lightning] start remote duplicate checking" + LitInfoStartImport string = "[ddl-lightning] start to import data" + LitInfoSetMemLimit string = "[ddl-lightning] set max memory limitation" + LitInfoChgMemSetting string = "[ddl-lightning] change memory setting for lightning" + LitInfoInitMemSetting string = "[ddl-lightning] initial memory setting for lightning" + LitInfoUnsafeImport string = "[ddl-lightning] do a partial import data into the storage" +) + +func genBackendAllocMemFailedErr(memRoot MemRoot, jobID int64) error { + logutil.BgLogger().Warn(LitErrAllocMemFail, zap.Int64("job ID", jobID), + zap.Int64("current memory usage", memRoot.CurrentUsage()), + zap.Int64("max memory quota", memRoot.MaxMemoryQuota())) + return errors.New(LitErrOutMaxMem) +} + +func genEngineAllocMemFailedErr(memRoot MemRoot, jobID, idxID int64) error { + logutil.BgLogger().Warn(LitErrAllocMemFail, zap.Int64("job ID", jobID), + zap.Int64("index ID", idxID), + zap.Int64("current memory usage", memRoot.CurrentUsage()), + zap.Int64("max memory quota", memRoot.MaxMemoryQuota())) + return errors.New(LitErrOutMaxMem) +} diff --git a/ddl/lightning/BUILD.bazel b/ddl/lightning/BUILD.bazel deleted file mode 100644 index 829f6cbaab76f..0000000000000 --- a/ddl/lightning/BUILD.bazel +++ /dev/null @@ -1,18 +0,0 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") - -go_library( - name = "lightning", - srcs = ["mem_root.go"], - importpath = "github.com/pingcap/tidb/ddl/lightning", - visibility = ["//visibility:public"], -) - -go_test( - name = "lightning_test", - srcs = ["mem_root_test.go"], - flaky = True, - deps = [ - ":lightning", - "@com_github_stretchr_testify//require", - ], -) diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 192b81b0b442d..5554e61d746f4 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -1778,9 +1778,9 @@ var defaultSysVars = []*SysVar{ }}, // This system var is set disk quota for lightning sort dir, from 100 GB to 1PB. {Scope: ScopeGlobal, Name: TiDBDDLDiskQuota, Value: strconv.Itoa(DefTiDBDDLDiskQuota), Type: TypeInt, MinValue: DefTiDBDDLDiskQuota, MaxValue: 1024 * 1024 * DefTiDBDDLDiskQuota / 100, GetGlobal: func(sv *SessionVars) (string, error) { - return strconv.FormatInt(DDLDiskQuota.Load(), 10), nil + return strconv.FormatUint(DDLDiskQuota.Load(), 10), nil }, SetGlobal: func(s *SessionVars, val string) error { - DDLDiskQuota.Store(TidbOptInt64(val, DefTiDBDDLDiskQuota)) + DDLDiskQuota.Store(TidbOptUint64(val, DefTiDBDDLDiskQuota)) return nil }}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBConstraintCheckInPlacePessimistic, Value: BoolToOnOff(DefTiDBConstraintCheckInPlacePessimistic), Type: TypeBool, diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index ee23695888fb1..8ace83b279fa4 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -1077,7 +1077,7 @@ var ( // EnableFastReorg indicates whether to use lightning to enhance DDL reorg performance. EnableFastReorg = atomic.NewBool(DefTiDBEnableFastReorg) // DDLDiskQuota is the temporary variable for set disk quota for lightning - DDLDiskQuota = atomic.NewInt64(DefTiDBDDLDiskQuota) + DDLDiskQuota = atomic.NewUint64(DefTiDBDDLDiskQuota) ) var ( diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index 58a6701273d5b..72b6a8eec2ff3 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -318,6 +318,15 @@ func TidbOptInt64(opt string, defaultVal int64) int64 { return val } +// TidbOptUint64 converts a string to an uint64. +func TidbOptUint64(opt string, defaultVal uint64) uint64 { + val, err := strconv.ParseUint(opt, 10, 64) + if err != nil { + return defaultVal + } + return val +} + func tidbOptFloat64(opt string, defaultVal float64) float64 { val, err := strconv.ParseFloat(opt, 64) if err != nil {