From 544dcfbf71def4eacd8f1be4513cf4813227007c Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 16 Feb 2023 15:02:10 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #41460 Signed-off-by: ti-chi-bot --- ddl/backfilling.go | 27 ++++-- .../addindextest/integration_test.go | 82 +++++++++++++++++++ 2 files changed, 101 insertions(+), 8 deletions(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 8fc6f552e60d3..78fcebcc739e9 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -609,7 +609,12 @@ type backfillScheduler struct { copReqSenderPool *copReqSenderPool // for add index in ingest way. } -const backfillTaskChanSize = 1024 +var backfillTaskChanSize = 1024 + +// SetBackfillTaskChanSizeForTest is only used for test. +func SetBackfillTaskChanSizeForTest(n int) { + backfillTaskChanSize = n +} func newBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *sessionPool, tp backfillWorkerType, tbl table.PhysicalTable, decColMap map[int64]decoder.Column, @@ -830,8 +835,11 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic if err != nil { return errors.Trace(err) } - scheduler.setMaxWorkerSize(len(kvRanges)) + if len(kvRanges) == 0 { + break + } + scheduler.setMaxWorkerSize(len(kvRanges)) err = scheduler.adjustWorkerSize() if err != nil { return errors.Trace(err) @@ -854,14 +862,17 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic if err != nil { return errors.Trace(err) } - - if len(remains) == 0 { - if ingestBeCtx != nil { - ingestBeCtx.EngMgr.ResetWorkers(ingestBeCtx, job.ID, reorgInfo.currElement.ID) - } + if len(remains) > 0 { + startKey = remains[0].StartKey + } else { + startKey = kvRanges[len(kvRanges)-1].EndKey + } + if startKey.Cmp(endKey) >= 0 { break } - startKey = remains[0].StartKey + } + if ingestBeCtx != nil { + ingestBeCtx.EngMgr.ResetWorkers(ingestBeCtx, job.ID, reorgInfo.currElement.ID) } return nil } diff --git a/tests/realtikvtest/addindextest/integration_test.go b/tests/realtikvtest/addindextest/integration_test.go index 9b79704d3e7b4..711d0beccf228 100644 --- a/tests/realtikvtest/addindextest/integration_test.go +++ b/tests/realtikvtest/addindextest/integration_test.go @@ -334,3 +334,85 @@ func TestAddIndexIngestUniqueKey(t *testing.T) { tk.MustExec("split table t by ('m');") tk.MustGetErrMsg("alter table t add unique index idx(b, c);", "[kv:1062]Duplicate entry '1-c1' for key 't.idx'") } +<<<<<<< HEAD +======= + +func TestAddIndexIngestCancel(t *testing.T) { + store, dom := realtikvtest.CreateMockStoreAndDomainAndSetup(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("drop database if exists addindexlit;") + tk.MustExec("create database addindexlit;") + tk.MustExec("use addindexlit;") + tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) + tk.MustExec("create table t (a int, b int);") + tk.MustExec("insert into t (a, b) values (1, 1), (2, 2), (3, 3);") + defHook := dom.DDL().GetHook() + customHook := newTestCallBack(t, dom) + cancelled := false + customHook.OnJobRunBeforeExported = func(job *model.Job) { + if cancelled { + return + } + if job.Type == model.ActionAddIndex && job.SchemaState == model.StateWriteReorganization { + idx := testutil.FindIdxInfo(dom, "addindexlit", "t", "idx") + if idx == nil { + return + } + if idx.BackfillState == model.BackfillStateRunning { + tk2 := testkit.NewTestKit(t, store) + rs, err := tk2.Exec(fmt.Sprintf("admin cancel ddl jobs %d", job.ID)) + assert.NoError(t, err) + assert.NoError(t, rs.Close()) + cancelled = true + } + } + } + dom.DDL().SetHook(customHook) + tk.MustGetErrCode("alter table t add index idx(b);", errno.ErrCancelledDDLJob) + require.True(t, cancelled) + dom.DDL().SetHook(defHook) + require.Empty(t, ingest.LitBackCtxMgr.Keys()) +} + +func TestAddIndexSplitTableRanges(t *testing.T) { + store := realtikvtest.CreateMockStoreAndSetup(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("drop database if exists addindexlit;") + tk.MustExec("create database addindexlit;") + tk.MustExec("use addindexlit;") + tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) + + tk.MustExec("create table t (a int primary key, b int);") + for i := 0; i < 8; i++ { + tk.MustExec(fmt.Sprintf("insert into t values (%d, %d);", i*10000, i*10000)) + } + tk.MustQuery("split table t between (0) and (80000) regions 7;").Check(testkit.Rows("6 1")) + + ddl.SetBackfillTaskChanSizeForTest(4) + tk.MustExec("alter table t add index idx(b);") + tk.MustExec("admin check table t;") + ddl.SetBackfillTaskChanSizeForTest(7) + tk.MustExec("alter table t add index idx_2(b);") + tk.MustExec("admin check table t;") + ddl.SetBackfillTaskChanSizeForTest(1024) +} + +type testCallback struct { + ddl.Callback + OnJobRunBeforeExported func(job *model.Job) +} + +func newTestCallBack(t *testing.T, dom *domain.Domain) *testCallback { + defHookFactory, err := ddl.GetCustomizedHook("default_hook") + require.NoError(t, err) + return &testCallback{ + Callback: defHookFactory(dom), + } +} + +func (c *testCallback) OnJobRunBefore(job *model.Job) { + if c.OnJobRunBeforeExported != nil { + c.OnJobRunBeforeExported(job) + } +} +>>>>>>> 9df59ea50a6 (ddl: correct the remain ranges check for adding index (#41460)) From b9e6636f9b8108ec7a0156db11ae5fbdbd752bdf Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 16 Feb 2023 15:35:07 +0800 Subject: [PATCH 2/2] resolve conflict --- .../addindextest/integration_test.go | 59 ------------------- 1 file changed, 59 deletions(-) diff --git a/tests/realtikvtest/addindextest/integration_test.go b/tests/realtikvtest/addindextest/integration_test.go index 711d0beccf228..df9c9baa05931 100644 --- a/tests/realtikvtest/addindextest/integration_test.go +++ b/tests/realtikvtest/addindextest/integration_test.go @@ -334,45 +334,6 @@ func TestAddIndexIngestUniqueKey(t *testing.T) { tk.MustExec("split table t by ('m');") tk.MustGetErrMsg("alter table t add unique index idx(b, c);", "[kv:1062]Duplicate entry '1-c1' for key 't.idx'") } -<<<<<<< HEAD -======= - -func TestAddIndexIngestCancel(t *testing.T) { - store, dom := realtikvtest.CreateMockStoreAndDomainAndSetup(t) - tk := testkit.NewTestKit(t, store) - tk.MustExec("drop database if exists addindexlit;") - tk.MustExec("create database addindexlit;") - tk.MustExec("use addindexlit;") - tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) - tk.MustExec("create table t (a int, b int);") - tk.MustExec("insert into t (a, b) values (1, 1), (2, 2), (3, 3);") - defHook := dom.DDL().GetHook() - customHook := newTestCallBack(t, dom) - cancelled := false - customHook.OnJobRunBeforeExported = func(job *model.Job) { - if cancelled { - return - } - if job.Type == model.ActionAddIndex && job.SchemaState == model.StateWriteReorganization { - idx := testutil.FindIdxInfo(dom, "addindexlit", "t", "idx") - if idx == nil { - return - } - if idx.BackfillState == model.BackfillStateRunning { - tk2 := testkit.NewTestKit(t, store) - rs, err := tk2.Exec(fmt.Sprintf("admin cancel ddl jobs %d", job.ID)) - assert.NoError(t, err) - assert.NoError(t, rs.Close()) - cancelled = true - } - } - } - dom.DDL().SetHook(customHook) - tk.MustGetErrCode("alter table t add index idx(b);", errno.ErrCancelledDDLJob) - require.True(t, cancelled) - dom.DDL().SetHook(defHook) - require.Empty(t, ingest.LitBackCtxMgr.Keys()) -} func TestAddIndexSplitTableRanges(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) @@ -396,23 +357,3 @@ func TestAddIndexSplitTableRanges(t *testing.T) { tk.MustExec("admin check table t;") ddl.SetBackfillTaskChanSizeForTest(1024) } - -type testCallback struct { - ddl.Callback - OnJobRunBeforeExported func(job *model.Job) -} - -func newTestCallBack(t *testing.T, dom *domain.Domain) *testCallback { - defHookFactory, err := ddl.GetCustomizedHook("default_hook") - require.NoError(t, err) - return &testCallback{ - Callback: defHookFactory(dom), - } -} - -func (c *testCallback) OnJobRunBefore(job *model.Job) { - if c.OnJobRunBeforeExported != nil { - c.OnJobRunBeforeExported(job) - } -} ->>>>>>> 9df59ea50a6 (ddl: correct the remain ranges check for adding index (#41460))