diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 5ea026184f13a..24bfa579ad242 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -155,6 +155,7 @@ type reorgBackfillTask struct { startKey kv.Key endKey kv.Key endInclude bool + source string } func (r *reorgBackfillTask) excludedEndKey() kv.Key { @@ -498,6 +499,7 @@ func (dc *ddlCtx) handleRangeTasks(scheduler *backfillScheduler, t table.Table, } // Build reorg tasks. job := reorgInfo.Job + source := getDDLRequestSource(job) for i, keyRange := range kvRanges { endKey := keyRange.EndKey endK, err := getRangeEndKey(scheduler.jobCtx, dc.store, job.Priority, prefix, keyRange.StartKey, endKey) @@ -515,7 +517,9 @@ func (dc *ddlCtx) handleRangeTasks(scheduler *backfillScheduler, t table.Table, startKey: keyRange.StartKey, endKey: endKey, // If the boundaries overlap, we should ignore the preceding endKey. - endInclude: endK.Cmp(keyRange.EndKey) != 0 || i == len(kvRanges)-1} + endInclude: endK.Cmp(keyRange.EndKey) != 0 || i == len(kvRanges)-1, + source: source, + } batchTasks = append(batchTasks, task) if len(batchTasks) >= backfillTaskChanSize { diff --git a/ddl/ddl.go b/ddl/ddl.go index 8c4d5235ea7ad..1e1b38eeb77bb 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -439,9 +439,9 @@ func (dc *ddlCtx) setDDLSourceForDiagnosis(job *model.Job) { ctx, exists := dc.jobCtx.jobCtxMap[job.ID] if !exists { ctx = NewJobContext() - ctx.setDDLLabelForDiagnosis(job) dc.jobCtx.jobCtxMap[job.ID] = ctx } + ctx.setDDLLabelForDiagnosis(job) } func (dc *ddlCtx) getResourceGroupTaggerForTopSQL(job *model.Job) tikvrpc.ResourceGroupTagger { @@ -1786,7 +1786,11 @@ func (s *session) execute(ctx context.Context, query string, label string) ([]ch defer func() { metrics.DDLJobTableDuration.WithLabelValues(label + "-" + metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds()) }() - rs, err := s.Context.(sqlexec.SQLExecutor).ExecuteInternal(kv.WithInternalSourceType(ctx, kv.InternalTxnDDL), query) + + if ctx.Value(kv.RequestSourceKey) == nil { + ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnDDL) + } + rs, err := s.Context.(sqlexec.SQLExecutor).ExecuteInternal(ctx, query) if err != nil { return nil, errors.Trace(err) } diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index dac2f01216edb..35fa9be696e49 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -119,7 +119,7 @@ func NewJobContext() *JobContext { cacheSQL: "", cacheNormalizedSQL: "", cacheDigest: nil, - tp: "unknown", + tp: "", } } @@ -760,6 +760,9 @@ func getDDLRequestSource(job *model.Job) string { } func (w *JobContext) setDDLLabelForDiagnosis(job *model.Job) { + if w.tp != "" { + return + } w.tp = getDDLRequestSource(job) w.ddlJobCtx = kv.WithInternalSourceType(w.ddlJobCtx, w.ddlJobSourceType()) } diff --git a/ddl/index_cop.go b/ddl/index_cop.go index 1220d99c78192..56bc7ab42ac25 100644 --- a/ddl/index_cop.go +++ b/ddl/index_cop.go @@ -137,7 +137,8 @@ func (c *copReqSender) run() { p.resultsCh <- idxRecResult{id: task.id, err: err} return } - rs, err := p.copCtx.buildTableScan(p.ctx, ver.Ver, task.startKey, task.excludedEndKey()) + ctx := kv.WithInternalSourceType(p.ctx, task.source) + rs, err := p.copCtx.buildTableScan(ctx, ver.Ver, task.startKey, task.excludedEndKey()) if err != nil { p.resultsCh <- idxRecResult{id: task.id, err: err} return @@ -422,6 +423,12 @@ func (c *copContext) buildTableScan(ctx context.Context, startTS uint64, start, SetFromInfoSchema(c.sessCtx.GetDomainInfoSchema()). SetConcurrency(1). Build() + builder.RequestSource.RequestSourceInternal = true + if source := ctx.Value(kv.RequestSourceKey); source != nil { + builder.RequestSource.RequestSourceType = source.(kv.RequestSource).RequestSourceType + } else { + builder.RequestSource.RequestSourceType = kv.InternalTxnDDL + } if err != nil { return nil, err } diff --git a/ddl/job_table.go b/ddl/job_table.go index 894c4c45b8380..238e95362a960 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -395,7 +395,8 @@ func updateDDLJob2Table(sctx *session, job *model.Job, updateRawArgs bool) error // getDDLReorgHandle gets DDL reorg handle. func getDDLReorgHandle(sess *session, job *model.Job) (element *meta.Element, startKey, endKey kv.Key, physicalTableID int64, err error) { sql := fmt.Sprintf("select ele_id, ele_type, start_key, end_key, physical_id from mysql.tidb_ddl_reorg where job_id = %d", job.ID) - rows, err := sess.execute(context.Background(), sql, "get_handle") + ctx := kv.WithInternalSourceType(context.Background(), getDDLRequestSource(job)) + rows, err := sess.execute(ctx, sql, "get_handle") if err != nil { return nil, nil, nil, 0, err } diff --git a/executor/simple_test.go b/executor/simple_test.go index 13a439ad64d46..895ea7109ac4a 100644 --- a/executor/simple_test.go +++ b/executor/simple_test.go @@ -21,13 +21,13 @@ import ( "testing" "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/auth" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/server" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/util" "github.com/stretchr/testify/require" - tikvutil "github.com/tikv/client-go/v2/util" ) func TestKillStmt(t *testing.T) { @@ -86,7 +86,7 @@ func TestKillStmt(t *testing.T) { func TestUserAttributes(t *testing.T) { store, _ := testkit.CreateMockStoreAndDomain(t) rootTK := testkit.NewTestKit(t, store) - ctx := context.WithValue(context.Background(), tikvutil.RequestSourceKey, tikvutil.RequestSource{RequestSourceInternal: true}) + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnPrivilege) // https://dev.mysql.com/doc/refman/8.0/en/create-user.html#create-user-comments-attributes rootTK.MustExec(`CREATE USER testuser COMMENT '1234'`) diff --git a/kv/txn.go b/kv/txn.go index d7828c7fb3138..035f2aa662eca 100644 --- a/kv/txn.go +++ b/kv/txn.go @@ -195,20 +195,22 @@ func BackOff(attempts uint) int { func setRequestSourceForInnerTxn(ctx context.Context, txn Transaction) { if source := ctx.Value(RequestSourceKey); source != nil { requestSource := source.(RequestSource) - if !requestSource.RequestSourceInternal { - logutil.Logger(ctx).Warn("`RunInNewTxn` should be used by inner txn only") + if requestSource.RequestSourceType != "" { + if !requestSource.RequestSourceInternal { + logutil.Logger(ctx).Warn("`RunInNewTxn` should be used by inner txn only") + } + txn.SetOption(RequestSourceInternal, requestSource.RequestSourceInternal) + txn.SetOption(RequestSourceType, requestSource.RequestSourceType) + return } - txn.SetOption(RequestSourceInternal, requestSource.RequestSourceInternal) - txn.SetOption(RequestSourceType, requestSource.RequestSourceType) + } + // panic in test mode in case there are requests without source in the future. + // log warnings in production mode. + if flag.Lookup("test.v") != nil || flag.Lookup("check.v") != nil { + panic("unexpected no source type context, if you see this error, " + + "the `RequestSourceTypeKey` is missing in your context") } else { - // panic in test mode in case there are requests without source in the future. - // log warnings in production mode. - if flag.Lookup("test.v") != nil || flag.Lookup("check.v") != nil { - panic("unexpected no source type context, if you see this error, " + - "the `RequestSourceTypeKey` is missing in your context") - } else { - logutil.Logger(ctx).Warn("unexpected no source type context, if you see this warning, " + - "the `RequestSourceTypeKey` is missing in the context") - } + logutil.Logger(ctx).Warn("unexpected no source type context, if you see this warning, " + + "the `RequestSourceTypeKey` is missing in the context") } } diff --git a/session/session.go b/session/session.go index e681606323fe1..6c13ba727803a 100644 --- a/session/session.go +++ b/session/session.go @@ -4112,23 +4112,26 @@ func (s *session) setRequestSource(ctx context.Context, stmtLabel string, stmtNo } else { s.sessionVars.RequestSourceType = stmtLabel } - } else { - if source := ctx.Value(kv.RequestSourceKey); source != nil { - s.sessionVars.RequestSourceType = source.(kv.RequestSource).RequestSourceType - } else { - // panic in test mode in case there are requests without source in the future. - // log warnings in production mode. - if flag.Lookup("test.v") != nil || flag.Lookup("check.v") != nil { - panic("unexpected no source type context, if you see this error, " + - "the `RequestSourceTypeKey` is missing in your context") - } else { - logutil.Logger(ctx).Warn("unexpected no source type context, if you see this warning, "+ - "the `RequestSourceTypeKey` is missing in the context", - zap.Bool("internal", s.isInternal()), - zap.String("sql", stmtNode.Text())) - } + return + } + if source := ctx.Value(kv.RequestSourceKey); source != nil { + requestSource := source.(kv.RequestSource) + if requestSource.RequestSourceType != "" { + s.sessionVars.RequestSourceType = requestSource.RequestSourceType + return } } + // panic in test mode in case there are requests without source in the future. + // log warnings in production mode. + if flag.Lookup("test.v") != nil || flag.Lookup("check.v") != nil { + panic("unexpected no source type context, if you see this error, " + + "the `RequestSourceTypeKey` is missing in your context") + } else { + logutil.Logger(ctx).Warn("unexpected no source type context, if you see this warning, "+ + "the `RequestSourceTypeKey` is missing in the context", + zap.Bool("internal", s.isInternal()), + zap.String("sql", stmtNode.Text())) + } } // RemoveLockDDLJobs removes the DDL jobs which doesn't get the metadata lock from job2ver.