Skip to content

Commit

Permalink
ddl: move file management into litBackendCtxMgr (#52645)
Browse files Browse the repository at this point in the history
close #52639
  • Loading branch information
lance6716 authored Apr 22, 2024
1 parent aa2c694 commit 79c1499
Show file tree
Hide file tree
Showing 13 changed files with 312 additions and 169 deletions.
15 changes: 8 additions & 7 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -863,13 +863,14 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
// Start some background routine to manage TiFlash replica.
d.wg.Run(d.PollTiFlashRoutine)

ctx, err := d.sessPool.Get()
if err != nil {
return err
}
defer d.sessPool.Put(ctx)

ingest.InitGlobalLightningEnv()
ingest.InitGlobalLightningEnv(func(jobIDs []int64) ([]int64, error) {
se, err := d.sessPool.Get()
if err != nil {
return nil, err
}
defer d.sessPool.Put(se)
return filterProcessingJobIDs(sess.NewSession(se), jobIDs)
})

return nil
}
Expand Down
55 changes: 1 addition & 54 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"slices"
"strings"
"sync/atomic"
Expand All @@ -41,7 +40,6 @@ import (
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/lightning/backend/local"
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/metrics"
Expand Down Expand Up @@ -805,16 +803,11 @@ func pickBackfillType(ctx context.Context, job *model.Job) (model.ReorgType, err
job.ReorgMeta.ReorgTp = model.ReorgTypeLitMerge
return model.ReorgTypeLitMerge, nil
}
available, err := ingest.LitBackCtxMgr.CheckAvailable()
available, err := ingest.LitBackCtxMgr.CheckMoreTasksAvailable(ctx)
if err != nil {
return model.ReorgTypeNone, err
}
if available {
ctx := logutil.WithCategory(ctx, "ddl-ingest")
err = cleanupSortPath(ctx, job.ID)
if err != nil {
return model.ReorgTypeNone, err
}
job.ReorgMeta.ReorgTp = model.ReorgTypeLitMerge
return model.ReorgTypeLitMerge, nil
}
Expand All @@ -832,52 +825,6 @@ func loadCloudStorageURI(w *worker, job *model.Job) {
job.ReorgMeta.UseCloudStorage = len(jc.cloudStorageURI) > 0
}

// cleanupSortPath is used to clean up the temp data of the previous jobs.
// Because we don't remove all the files after the support of checkpoint,
// there maybe some stale files in the sort path if TiDB is killed during the backfill process.
func cleanupSortPath(ctx context.Context, currentJobID int64) error {
sortPath := ingest.ConfigSortPath()
err := os.MkdirAll(sortPath, 0700)
if err != nil {
return errors.Trace(err)
}
entries, err := os.ReadDir(sortPath)
if err != nil {
logutil.Logger(ctx).Warn(ingest.LitErrReadSortPath, zap.Error(err))
return errors.Trace(err)
}
for _, entry := range entries {
if !entry.IsDir() {
continue
}
jobID, err := ingest.DecodeBackendTag(entry.Name())
if err != nil {
logutil.Logger(ctx).Warn(ingest.LitErrCleanSortPath, zap.Error(err))
continue
}
if _, ok := ingest.LitBackCtxMgr.Load(jobID); ok {
// The job is still running, skip it.
logutil.Logger(ctx).Warn("the job is still running, skip removing it",
zap.Int64("running job ID", jobID))
continue
}
// Remove all the temp data of the previous done jobs.
if jobID < currentJobID {
logutil.Logger(ctx).Info("remove stale temp index data",
zap.Int64("jobID", jobID), zap.Int64("currentJobID", currentJobID))
err := os.RemoveAll(filepath.Join(sortPath, entry.Name()))
if err != nil {
logutil.Logger(ctx).Warn(ingest.LitErrCleanSortPath, zap.Error(err))
return nil
}
failpoint.Inject("ownerResignAfterDispatchLoopCheck", func() {
close(local.WaitRMFolderChForTest)
})
}
}
return nil
}

func doReorgWorkForCreateIndexMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
tbl table.Table, allIndexInfos []*model.IndexInfo) (done bool, ver int64, err error) {
if job.MultiSchemaInfo.Revertible {
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ go_library(
"@com_github_tikv_pd_client//:client",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_client_v3//concurrency",
"@org_golang_x_exp//maps",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
Expand Down
Loading

0 comments on commit 79c1499

Please sign in to comment.