From 8ca32de94f53863d1eddc339fdf5676b864644ae Mon Sep 17 00:00:00 2001 From: you06 Date: Wed, 21 Dec 2022 15:59:18 +0800 Subject: [PATCH 1/5] fix race Signed-off-by: you06 set request source in next calling Signed-off-by: you06 more request source check for testing Signed-off-by: you06 --- ddl/ddl.go | 8 ++++++-- ddl/ddl_worker.go | 5 ++++- ddl/job_table.go | 3 ++- kv/txn.go | 28 +++++++++++++++------------- session/session.go | 28 ++++++++++++++++------------ 5 files changed, 43 insertions(+), 29 deletions(-) diff --git a/ddl/ddl.go b/ddl/ddl.go index 8c4d5235ea7ad..1e1b38eeb77bb 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -439,9 +439,9 @@ func (dc *ddlCtx) setDDLSourceForDiagnosis(job *model.Job) { ctx, exists := dc.jobCtx.jobCtxMap[job.ID] if !exists { ctx = NewJobContext() - ctx.setDDLLabelForDiagnosis(job) dc.jobCtx.jobCtxMap[job.ID] = ctx } + ctx.setDDLLabelForDiagnosis(job) } func (dc *ddlCtx) getResourceGroupTaggerForTopSQL(job *model.Job) tikvrpc.ResourceGroupTagger { @@ -1786,7 +1786,11 @@ func (s *session) execute(ctx context.Context, query string, label string) ([]ch defer func() { metrics.DDLJobTableDuration.WithLabelValues(label + "-" + metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds()) }() - rs, err := s.Context.(sqlexec.SQLExecutor).ExecuteInternal(kv.WithInternalSourceType(ctx, kv.InternalTxnDDL), query) + + if ctx.Value(kv.RequestSourceKey) == nil { + ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnDDL) + } + rs, err := s.Context.(sqlexec.SQLExecutor).ExecuteInternal(ctx, query) if err != nil { return nil, errors.Trace(err) } diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index dac2f01216edb..35fa9be696e49 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -119,7 +119,7 @@ func NewJobContext() *JobContext { cacheSQL: "", cacheNormalizedSQL: "", cacheDigest: nil, - tp: "unknown", + tp: "", } } @@ -760,6 +760,9 @@ func getDDLRequestSource(job *model.Job) string { } func (w *JobContext) setDDLLabelForDiagnosis(job *model.Job) { + if w.tp != "" { + return + } w.tp = getDDLRequestSource(job) w.ddlJobCtx = kv.WithInternalSourceType(w.ddlJobCtx, w.ddlJobSourceType()) } diff --git a/ddl/job_table.go b/ddl/job_table.go index 894c4c45b8380..238e95362a960 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -395,7 +395,8 @@ func updateDDLJob2Table(sctx *session, job *model.Job, updateRawArgs bool) error // getDDLReorgHandle gets DDL reorg handle. func getDDLReorgHandle(sess *session, job *model.Job) (element *meta.Element, startKey, endKey kv.Key, physicalTableID int64, err error) { sql := fmt.Sprintf("select ele_id, ele_type, start_key, end_key, physical_id from mysql.tidb_ddl_reorg where job_id = %d", job.ID) - rows, err := sess.execute(context.Background(), sql, "get_handle") + ctx := kv.WithInternalSourceType(context.Background(), getDDLRequestSource(job)) + rows, err := sess.execute(ctx, sql, "get_handle") if err != nil { return nil, nil, nil, 0, err } diff --git a/kv/txn.go b/kv/txn.go index d7828c7fb3138..035f2aa662eca 100644 --- a/kv/txn.go +++ b/kv/txn.go @@ -195,20 +195,22 @@ func BackOff(attempts uint) int { func setRequestSourceForInnerTxn(ctx context.Context, txn Transaction) { if source := ctx.Value(RequestSourceKey); source != nil { requestSource := source.(RequestSource) - if !requestSource.RequestSourceInternal { - logutil.Logger(ctx).Warn("`RunInNewTxn` should be used by inner txn only") + if requestSource.RequestSourceType != "" { + if !requestSource.RequestSourceInternal { + logutil.Logger(ctx).Warn("`RunInNewTxn` should be used by inner txn only") + } + txn.SetOption(RequestSourceInternal, requestSource.RequestSourceInternal) + txn.SetOption(RequestSourceType, requestSource.RequestSourceType) + return } - txn.SetOption(RequestSourceInternal, requestSource.RequestSourceInternal) - txn.SetOption(RequestSourceType, requestSource.RequestSourceType) + } + // panic in test mode in case there are requests without source in the future. + // log warnings in production mode. + if flag.Lookup("test.v") != nil || flag.Lookup("check.v") != nil { + panic("unexpected no source type context, if you see this error, " + + "the `RequestSourceTypeKey` is missing in your context") } else { - // panic in test mode in case there are requests without source in the future. - // log warnings in production mode. - if flag.Lookup("test.v") != nil || flag.Lookup("check.v") != nil { - panic("unexpected no source type context, if you see this error, " + - "the `RequestSourceTypeKey` is missing in your context") - } else { - logutil.Logger(ctx).Warn("unexpected no source type context, if you see this warning, " + - "the `RequestSourceTypeKey` is missing in the context") - } + logutil.Logger(ctx).Warn("unexpected no source type context, if you see this warning, " + + "the `RequestSourceTypeKey` is missing in the context") } } diff --git a/session/session.go b/session/session.go index e681606323fe1..7feb82d2a873d 100644 --- a/session/session.go +++ b/session/session.go @@ -4112,23 +4112,27 @@ func (s *session) setRequestSource(ctx context.Context, stmtLabel string, stmtNo } else { s.sessionVars.RequestSourceType = stmtLabel } + return } else { if source := ctx.Value(kv.RequestSourceKey); source != nil { - s.sessionVars.RequestSourceType = source.(kv.RequestSource).RequestSourceType - } else { - // panic in test mode in case there are requests without source in the future. - // log warnings in production mode. - if flag.Lookup("test.v") != nil || flag.Lookup("check.v") != nil { - panic("unexpected no source type context, if you see this error, " + - "the `RequestSourceTypeKey` is missing in your context") - } else { - logutil.Logger(ctx).Warn("unexpected no source type context, if you see this warning, "+ - "the `RequestSourceTypeKey` is missing in the context", - zap.Bool("internal", s.isInternal()), - zap.String("sql", stmtNode.Text())) + requestSource := source.(kv.RequestSource) + if requestSource.RequestSourceType != "" { + s.sessionVars.RequestSourceType = requestSource.RequestSourceType + return } } } + // panic in test mode in case there are requests without source in the future. + // log warnings in production mode. + if flag.Lookup("test.v") != nil || flag.Lookup("check.v") != nil { + panic("unexpected no source type context, if you see this error, " + + "the `RequestSourceTypeKey` is missing in your context") + } else { + logutil.Logger(ctx).Warn("unexpected no source type context, if you see this warning, "+ + "the `RequestSourceTypeKey` is missing in the context", + zap.Bool("internal", s.isInternal()), + zap.String("sql", stmtNode.Text())) + } } // RemoveLockDDLJobs removes the DDL jobs which doesn't get the metadata lock from job2ver. From cddd223dd3fbf2ea8223a573baeb41bc1ffc5f3e Mon Sep 17 00:00:00 2001 From: you06 Date: Wed, 21 Dec 2022 20:54:48 +0800 Subject: [PATCH 2/5] lint Signed-off-by: you06 --- session/session.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/session/session.go b/session/session.go index 7feb82d2a873d..6c13ba727803a 100644 --- a/session/session.go +++ b/session/session.go @@ -4113,13 +4113,12 @@ func (s *session) setRequestSource(ctx context.Context, stmtLabel string, stmtNo s.sessionVars.RequestSourceType = stmtLabel } return - } else { - if source := ctx.Value(kv.RequestSourceKey); source != nil { - requestSource := source.(kv.RequestSource) - if requestSource.RequestSourceType != "" { - s.sessionVars.RequestSourceType = requestSource.RequestSourceType - return - } + } + if source := ctx.Value(kv.RequestSourceKey); source != nil { + requestSource := source.(kv.RequestSource) + if requestSource.RequestSourceType != "" { + s.sessionVars.RequestSourceType = requestSource.RequestSourceType + return } } // panic in test mode in case there are requests without source in the future. From acac3bb62f5ddadff7b81afdbb371d050cc1b141 Mon Sep 17 00:00:00 2001 From: you06 Date: Thu, 22 Dec 2022 09:56:44 +0800 Subject: [PATCH 3/5] fix test Signed-off-by: you06 --- executor/simple_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/executor/simple_test.go b/executor/simple_test.go index 13a439ad64d46..102385a773b7d 100644 --- a/executor/simple_test.go +++ b/executor/simple_test.go @@ -20,6 +20,8 @@ import ( "strconv" "testing" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/parser/auth" "github.com/pingcap/tidb/parser/mysql" @@ -27,7 +29,6 @@ import ( "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/util" "github.com/stretchr/testify/require" - tikvutil "github.com/tikv/client-go/v2/util" ) func TestKillStmt(t *testing.T) { @@ -86,7 +87,7 @@ func TestKillStmt(t *testing.T) { func TestUserAttributes(t *testing.T) { store, _ := testkit.CreateMockStoreAndDomain(t) rootTK := testkit.NewTestKit(t, store) - ctx := context.WithValue(context.Background(), tikvutil.RequestSourceKey, tikvutil.RequestSource{RequestSourceInternal: true}) + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnPrivilege) // https://dev.mysql.com/doc/refman/8.0/en/create-user.html#create-user-comments-attributes rootTK.MustExec(`CREATE USER testuser COMMENT '1234'`) From cc40bf17d0b8e2716717bbb5c139a678b1cd6a29 Mon Sep 17 00:00:00 2001 From: you06 Date: Thu, 22 Dec 2022 10:04:26 +0800 Subject: [PATCH 4/5] lint Signed-off-by: you06 --- executor/simple_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/executor/simple_test.go b/executor/simple_test.go index 102385a773b7d..895ea7109ac4a 100644 --- a/executor/simple_test.go +++ b/executor/simple_test.go @@ -20,9 +20,8 @@ import ( "strconv" "testing" - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/auth" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/server" From 79d2945c5b1e98db1311d5c61a6c76fcc9b2c0bc Mon Sep 17 00:00:00 2001 From: you06 Date: Tue, 14 Feb 2023 14:07:20 +0800 Subject: [PATCH 5/5] handle table scan when reorg Signed-off-by: you06 --- ddl/backfilling.go | 6 +++++- ddl/index_cop.go | 9 ++++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 5ea026184f13a..24bfa579ad242 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -155,6 +155,7 @@ type reorgBackfillTask struct { startKey kv.Key endKey kv.Key endInclude bool + source string } func (r *reorgBackfillTask) excludedEndKey() kv.Key { @@ -498,6 +499,7 @@ func (dc *ddlCtx) handleRangeTasks(scheduler *backfillScheduler, t table.Table, } // Build reorg tasks. job := reorgInfo.Job + source := getDDLRequestSource(job) for i, keyRange := range kvRanges { endKey := keyRange.EndKey endK, err := getRangeEndKey(scheduler.jobCtx, dc.store, job.Priority, prefix, keyRange.StartKey, endKey) @@ -515,7 +517,9 @@ func (dc *ddlCtx) handleRangeTasks(scheduler *backfillScheduler, t table.Table, startKey: keyRange.StartKey, endKey: endKey, // If the boundaries overlap, we should ignore the preceding endKey. - endInclude: endK.Cmp(keyRange.EndKey) != 0 || i == len(kvRanges)-1} + endInclude: endK.Cmp(keyRange.EndKey) != 0 || i == len(kvRanges)-1, + source: source, + } batchTasks = append(batchTasks, task) if len(batchTasks) >= backfillTaskChanSize { diff --git a/ddl/index_cop.go b/ddl/index_cop.go index 1220d99c78192..56bc7ab42ac25 100644 --- a/ddl/index_cop.go +++ b/ddl/index_cop.go @@ -137,7 +137,8 @@ func (c *copReqSender) run() { p.resultsCh <- idxRecResult{id: task.id, err: err} return } - rs, err := p.copCtx.buildTableScan(p.ctx, ver.Ver, task.startKey, task.excludedEndKey()) + ctx := kv.WithInternalSourceType(p.ctx, task.source) + rs, err := p.copCtx.buildTableScan(ctx, ver.Ver, task.startKey, task.excludedEndKey()) if err != nil { p.resultsCh <- idxRecResult{id: task.id, err: err} return @@ -422,6 +423,12 @@ func (c *copContext) buildTableScan(ctx context.Context, startTS uint64, start, SetFromInfoSchema(c.sessCtx.GetDomainInfoSchema()). SetConcurrency(1). Build() + builder.RequestSource.RequestSourceInternal = true + if source := ctx.Value(kv.RequestSourceKey); source != nil { + builder.RequestSource.RequestSourceType = source.(kv.RequestSource).RequestSourceType + } else { + builder.RequestSource.RequestSourceType = kv.InternalTxnDDL + } if err != nil { return nil, err }