From c88eb86638d98ca0a2464e0b97813b15368bf8fd Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 18 Oct 2023 15:15:29 +0800 Subject: [PATCH] ddl: refine job_type of admin show ddl jobs (#47699) ref pingcap/tidb#45719 --- pkg/ddl/ddl_worker.go | 1 + pkg/ddl/index.go | 10 +++++++++- pkg/ddl/multi_schema_change.go | 1 + pkg/executor/executor.go | 18 ++++++++++++++++-- pkg/parser/model/ddl.go | 2 ++ pkg/parser/model/reorg.go | 1 + 6 files changed, 30 insertions(+), 3 deletions(-) diff --git a/pkg/ddl/ddl_worker.go b/pkg/ddl/ddl_worker.go index 029c16f00f058..96d59251ff32b 100644 --- a/pkg/ddl/ddl_worker.go +++ b/pkg/ddl/ddl_worker.go @@ -117,6 +117,7 @@ type JobContext struct { tp string resourceGroupName string + cloudStorageURI string } // NewJobContext returns a new ddl job context. diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index 7e8a76c823ec4..b0f888b53d90e 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -644,6 +644,7 @@ SwitchIndexState: } return ver, err } + loadCloudStorageURI(w, job) if reorgTp.NeedMergeProcess() { // Increase telemetryAddIndexIngestUsage telemetryAddIndexIngestUsage.Inc() @@ -791,6 +792,12 @@ func pickBackfillType(ctx context.Context, job *model.Job, unique bool, d *ddlCt return model.ReorgTypeTxnMerge, nil } +func loadCloudStorageURI(w *worker, job *model.Job) { + jc := w.jobContext(job.ID, job.ReorgMeta) + jc.cloudStorageURI = variable.CloudStorageURI.Load() + job.ReorgMeta.UseCloudStorage = len(jc.cloudStorageURI) > 0 +} + // cleanupSortPath is used to clean up the temp data of the previous jobs. // Because we don't remove all the files after the support of checkpoint, // there maybe some stale files in the sort path if TiDB is killed during the backfill process. @@ -2098,11 +2105,12 @@ func (w *worker) executeDistGlobalTask(reorgInfo *reorgInfo) error { elemIDs = append(elemIDs, elem.ID) } + job := reorgInfo.Job taskMeta := &BackfillGlobalMeta{ Job: *reorgInfo.Job.Clone(), EleIDs: elemIDs, EleTypeKey: reorgInfo.currElement.TypeKey, - CloudStorageURI: variable.CloudStorageURI.Load(), + CloudStorageURI: w.jobContext(job.ID, job.ReorgMeta).cloudStorageURI, } metaData, err := json.Marshal(taskMeta) diff --git a/pkg/ddl/multi_schema_change.go b/pkg/ddl/multi_schema_change.go index 1380aeb25241f..9956446910e7a 100644 --- a/pkg/ddl/multi_schema_change.go +++ b/pkg/ddl/multi_schema_change.go @@ -198,6 +198,7 @@ func appendToSubJobs(m *model.MultiSchemaInfo, job *model.Job) error { Revertible: true, CtxVars: job.CtxVars, ReorgTp: reorgTp, + UseCloud: false, }) return nil } diff --git a/pkg/executor/executor.go b/pkg/executor/executor.go index 9eca11f41cac2..34c10220cb66b 100644 --- a/pkg/executor/executor.go +++ b/pkg/executor/executor.go @@ -519,10 +519,17 @@ func (e *DDLJobRetriever) appendJobToChunk(req *chunk.Chunk, job *model.Job, che func showAddIdxReorgTp(job *model.Job) string { if job.Type == model.ActionAddIndex || job.Type == model.ActionAddPrimaryKey { if job.ReorgMeta != nil { + sb := strings.Builder{} tp := job.ReorgMeta.ReorgTp.String() if len(tp) > 0 { - return " /* " + tp + " */" + sb.WriteString(" /* ") + sb.WriteString(tp) + if job.ReorgMeta.UseCloudStorage { + sb.WriteString(" cloud") + } + sb.WriteString(" */") } + return sb.String() } } return "" @@ -530,10 +537,17 @@ func showAddIdxReorgTp(job *model.Job) string { func showAddIdxReorgTpInSubJob(subJob *model.SubJob) string { if subJob.Type == model.ActionAddIndex || subJob.Type == model.ActionAddPrimaryKey { + sb := strings.Builder{} tp := subJob.ReorgTp.String() if len(tp) > 0 { - return " /* " + tp + " */" + sb.WriteString(" /* ") + sb.WriteString(tp) + if subJob.UseCloud { + sb.WriteString(" cloud") + } + sb.WriteString(" */") } + return sb.String() } return "" } diff --git a/pkg/parser/model/ddl.go b/pkg/parser/model/ddl.go index ec12b06bde7ae..f0aa1af73e1c8 100644 --- a/pkg/parser/model/ddl.go +++ b/pkg/parser/model/ddl.go @@ -301,6 +301,7 @@ type SubJob struct { CtxVars []interface{} `json:"-"` SchemaVer int64 `json:"schema_version"` ReorgTp ReorgType `json:"reorg_tp"` + UseCloud bool `json:"use_cloud"` } // IsNormal returns true if the sub-job is normally running. @@ -369,6 +370,7 @@ func (sub *SubJob) FromProxyJob(proxyJob *Job, ver int64) { sub.RowCount = proxyJob.RowCount sub.SchemaVer = ver sub.ReorgTp = proxyJob.ReorgMeta.ReorgTp + sub.UseCloud = proxyJob.ReorgMeta.UseCloudStorage } // JobMeta is meta info of Job. diff --git a/pkg/parser/model/reorg.go b/pkg/parser/model/reorg.go index 07f702b9b916e..927f842e20f13 100644 --- a/pkg/parser/model/reorg.go +++ b/pkg/parser/model/reorg.go @@ -29,6 +29,7 @@ type DDLReorgMeta struct { Location *TimeZoneLocation `json:"location"` ReorgTp ReorgType `json:"reorg_tp"` IsDistReorg bool `json:"is_dist_reorg"` + UseCloudStorage bool `json:"use_cloud_storage"` ResourceGroupName string `json:"resource_group_name"` }