From 4909abdfc1ce649c7a07cf3f72ef44a8c6266037 Mon Sep 17 00:00:00 2001 From: sdojjy Date: Mon, 13 Dec 2021 14:48:22 +0800 Subject: [PATCH 1/8] fix ticdc ddl special comment syntax error --- cdc/owner/changefeed.go | 26 ++++++- cdc/owner/changefeed_test.go | 143 +++++++++++++++++++++++++++++++++++ 2 files changed, 167 insertions(+), 2 deletions(-) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index aaf0f93f392..ddc94af68b7 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -15,6 +15,9 @@ package owner import ( "context" + "github.com/pingcap/tidb/parser" + "github.com/pingcap/tidb/parser/format" + "strings" "sync" "github.com/pingcap/errors" @@ -29,7 +32,6 @@ import ( "github.com/pingcap/ticdc/pkg/txnutil/gc" "github.com/pingcap/ticdc/pkg/util" timodel "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/sessionctx/binloginfo" "github.com/prometheus/client_golang/prometheus" "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" @@ -449,7 +451,11 @@ func (c *changefeed) asyncExecDDL(ctx cdcContext.Context, job *timodel.Job) (don if err != nil { return false, errors.Trace(err) } - ddlEvent.Query = binloginfo.AddSpecialComment(ddlEvent.Query) + ddlEvent.Query, err = addSpecialComment(ddlEvent.Query) + if err != nil { + return false, errors.Trace(err) + } + c.ddlEventCache = ddlEvent if c.redoManager.Enabled() { err = c.redoManager.EmitDDLEvent(ctx, ddlEvent) @@ -494,3 +500,19 @@ func (c *changefeed) updateStatus(currentTs int64, checkpointTs, resolvedTs mode func (c *changefeed) Close(ctx context.Context) { c.releaseResources(ctx) } + +// addSpecialComment translate tidb feature to comment +func addSpecialComment(ddlQuery string) (string, error) { + stms, _, err := parser.New().ParseSQL(ddlQuery) + if err != nil { + return "", errors.Trace(err) + } + if len(stms) != 1 { + log.Panic("invalid ddlQuery statement size", zap.String("ddlQuery", ddlQuery)) + } + var sb strings.Builder + if err = stms[0].Restore(format.NewRestoreCtx(format.RestoreTiDBSpecialComment, &sb)); err != nil { + return "", errors.Trace(err) + } + return sb.String(), nil +} diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index e3a630a4340..cc0d58e12bd 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -15,7 +15,9 @@ package owner import ( "context" + "github.com/stretchr/testify/require" "sync/atomic" + "testing" "time" "github.com/pingcap/check" @@ -352,3 +354,144 @@ func (s *changefeedSuite) TestFinished(c *check.C) { c.Assert(state.Status.CheckpointTs, check.Equals, state.Info.TargetTs) c.Assert(state.Info.State, check.Equals, model.StateFinished) } + +func TestAddSpecialComment(t *testing.T) { + testCase := []struct { + input string + result string + }{ + { + "create table t1 (id int ) shard_row_id_bits=2;", + "CREATE TABLE t1 (id int) /*T! SHARD_ROW_ID_BITS = 2 */", + }, + { + "create table t1 (id int ) shard_row_id_bits=2 pre_split_regions=2;", + "CREATE TABLE t1 (id int) /*T! SHARD_ROW_ID_BITS = 2 */ /*T! PRE_SPLIT_REGIONS = 2 */", + }, + { + "create table t1 (id int ) shard_row_id_bits=2 pre_split_regions=2;", + "CREATE TABLE t1 (id int) /*T! SHARD_ROW_ID_BITS = 2 */ /*T! PRE_SPLIT_REGIONS = 2 */", + }, + + { + "create table t1 (id int ) shard_row_id_bits=2 engine=innodb pre_split_regions=2;", + "CREATE TABLE t1 (id int) /*T! SHARD_ROW_ID_BITS = 2 */ ENGINE = innodb /*T! PRE_SPLIT_REGIONS = 2 */", + }, + { + "create table t1 (id int ) pre_split_regions=2 shard_row_id_bits=2;", + "CREATE TABLE t1 (id int) /*T! PRE_SPLIT_REGIONS = 2 */ /*T! SHARD_ROW_ID_BITS = 2 */", + }, + { + "create table t6 (id int ) shard_row_id_bits=2 shard_row_id_bits=3 pre_split_regions=2;", + "CREATE TABLE t6 (id int) /*T! SHARD_ROW_ID_BITS = 2 */ /*T! SHARD_ROW_ID_BITS = 3 */ /*T! PRE_SPLIT_REGIONS = 2 */", + }, + { + "create table t1 (id int primary key auto_random(2));", + "CREATE TABLE t1 (id int PRIMARY KEY /*T![auto_rand] AUTO_RANDOM(2) */)", + }, + { + "create table t1 (id int primary key auto_random);", + "CREATE TABLE t1 (id int PRIMARY KEY /*T![auto_rand] AUTO_RANDOM */)", + }, + { + "create table t1 (id int auto_random ( 4 ) primary key);", + "CREATE TABLE t1 (id int /*T![auto_rand] AUTO_RANDOM(4) */ PRIMARY KEY)", + }, + { + "create table t1 (id int auto_random ( 4 ) primary key);", + "CREATE TABLE t1 (id int /*T![auto_rand] AUTO_RANDOM(4) */ PRIMARY KEY)", + }, + { + "create table t1 (id int auto_random ( 3 ) primary key) auto_random_base = 100;", + "CREATE TABLE t1 (id int /*T![auto_rand] AUTO_RANDOM(3) */ PRIMARY KEY) /*T![auto_rand_base] AUTO_RANDOM_BASE = 100 */", + }, + { + "create table t1 (id int auto_random primary key) auto_random_base = 50;", + "CREATE TABLE t1 (id int /*T![auto_rand] AUTO_RANDOM */ PRIMARY KEY) /*T![auto_rand_base] AUTO_RANDOM_BASE = 50 */", + }, + { + "create table t1 (id int auto_increment key) auto_id_cache 100;", + "CREATE TABLE t1 (id int AUTO_INCREMENT PRIMARY KEY) /*T![auto_id_cache] AUTO_ID_CACHE = 100 */", + }, + { + "create table t1 (id int auto_increment unique) auto_id_cache 10;", + "CREATE TABLE t1 (id int AUTO_INCREMENT UNIQUE KEY) /*T![auto_id_cache] AUTO_ID_CACHE = 10 */", + }, + { + "create table t1 (id int) auto_id_cache = 5;", + "CREATE TABLE t1 (id int) /*T![auto_id_cache] AUTO_ID_CACHE = 5 */", + }, + { + "create table t1 (id int) auto_id_cache=5;", + "CREATE TABLE t1 (id int) /*T![auto_id_cache] AUTO_ID_CACHE = 5 */", + }, + { + "create table t1 (id int) /*T![auto_id_cache] auto_id_cache=5 */ ;", + "CREATE TABLE t1 (id int) /*T![auto_id_cache] AUTO_ID_CACHE = 5 */", + }, + { + "create table t1 (id int, a varchar(255), primary key (a, b) clustered);", + "CREATE TABLE t1 (id int,a varchar(255),PRIMARY KEY(a, b) /*T![clustered_index] CLUSTERED */)", + }, + { + "create table t1(id int, v int, primary key(a) clustered);", + "CREATE TABLE t1 (id int,v int,PRIMARY KEY(a) /*T![clustered_index] CLUSTERED */)", + }, + { + "create table t1(id int primary key clustered, v int);", + "CREATE TABLE t1 (id int PRIMARY KEY /*T![clustered_index] CLUSTERED */,v int)", + }, + { + "alter table t add primary key(a) clustered;", + "ALTER TABLE t ADD PRIMARY KEY(a) /*T![clustered_index] CLUSTERED */", + }, + { + "create table t1 (id int, a varchar(255), primary key (a, b) nonclustered);", + "CREATE TABLE t1 (id int,a varchar(255),PRIMARY KEY(a, b) /*T![clustered_index] NONCLUSTERED */)", + }, + { + "create table t1 (id int, a varchar(255), primary key (a, b) /*T![clustered_index] nonclustered */);", + "CREATE TABLE t1 (id int,a varchar(255),PRIMARY KEY(a, b) /*T![clustered_index] NONCLUSTERED */)", + }, + { + "create table clustered_test(id int)", + "CREATE TABLE clustered_test (id int)", + }, + { + "create database clustered_test", + "CREATE DATABASE clustered_test", + }, + { + "create database clustered", + "CREATE DATABASE clustered", + }, + { + "create table clustered (id int)", + "CREATE TABLE clustered (id int)", + }, + { + "create table t1 (id int, a varchar(255) key clustered);", + "CREATE TABLE t1 (id int,a varchar(255) PRIMARY KEY /*T![clustered_index] CLUSTERED */)", + }, + { + "alter table t force auto_increment = 12;", + "ALTER TABLE t /*T![force_inc] FORCE */ AUTO_INCREMENT = 12", + }, + { + "alter table t force, auto_increment = 12;", + "ALTER TABLE t FORCE /* AlterTableForce is not supported */ , AUTO_INCREMENT = 12", + }, + { + "create table cdc_test (id varchar(10) primary key ,c1 varchar(10)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin/*!90000 SHARD_ROW_ID_BITS=4 PRE_SPLIT_REGIONS=3 */", + "CREATE TABLE cdc_test (id varchar(10) PRIMARY KEY,c1 varchar(10)) ENGINE = InnoDB DEFAULT CHARACTER SET = utf8mb4 DEFAULT COLLATE = utf8mb4_bin /*T! SHARD_ROW_ID_BITS = 4 */ /*T! PRE_SPLIT_REGIONS = 3 */", + }, + } + for _, ca := range testCase { + re, err := addSpecialComment(ca.input) + require.Nil(t, err) + require.Equal(t, ca.result, re) + } + require.Panics(t, func() { + _, _ = addSpecialComment("alter table t force, auto_increment = 12;alter table t force, auto_increment = 12;") + }) +} From c4e5aaa12eac7e6456eb0fb9e7e4e221ca1ee7cc Mon Sep 17 00:00:00 2001 From: sdojjy Date: Mon, 13 Dec 2021 14:55:10 +0800 Subject: [PATCH 2/8] fix lint --- cdc/owner/changefeed.go | 4 ++-- cdc/owner/changefeed_test.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index ddc94af68b7..eead355f19a 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -15,8 +15,6 @@ package owner import ( "context" - "github.com/pingcap/tidb/parser" - "github.com/pingcap/tidb/parser/format" "strings" "sync" @@ -31,6 +29,8 @@ import ( "github.com/pingcap/ticdc/pkg/orchestrator" "github.com/pingcap/ticdc/pkg/txnutil/gc" "github.com/pingcap/ticdc/pkg/util" + "github.com/pingcap/tidb/parser" + "github.com/pingcap/tidb/parser/format" timodel "github.com/pingcap/tidb/parser/model" "github.com/prometheus/client_golang/prometheus" "github.com/tikv/client-go/v2/oracle" diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index cc0d58e12bd..7c0d8fe7ba2 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -15,7 +15,6 @@ package owner import ( "context" - "github.com/stretchr/testify/require" "sync/atomic" "testing" "time" @@ -32,6 +31,7 @@ import ( "github.com/pingcap/ticdc/pkg/util/testleak" "github.com/pingcap/ticdc/pkg/version" timodel "github.com/pingcap/tidb/parser/model" + "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" ) From 102aab423c96878e24d6b6473d6351cd0458b1b3 Mon Sep 17 00:00:00 2001 From: sdojjy Date: Mon, 13 Dec 2021 16:01:50 +0800 Subject: [PATCH 3/8] fix ut --- cdc/owner/changefeed_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index 7c0d8fe7ba2..12f9f8045e7 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -272,7 +272,7 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) { mockDDLPuller.ddlQueue = append(mockDDLPuller.ddlQueue, job) tickThreeTime() c.Assert(state.Status.CheckpointTs, check.Equals, mockDDLPuller.resolvedTs) - c.Assert(mockAsyncSink.ddlExecuting.Query, check.Equals, "create database test1") + c.Assert(mockAsyncSink.ddlExecuting.Query, check.Equals, "CREATE DATABASE test1") // executing the ddl finished mockAsyncSink.ddlDone = true @@ -287,7 +287,7 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) { mockDDLPuller.ddlQueue = append(mockDDLPuller.ddlQueue, job) tickThreeTime() c.Assert(state.Status.CheckpointTs, check.Equals, mockDDLPuller.resolvedTs) - c.Assert(mockAsyncSink.ddlExecuting.Query, check.Equals, "create table test1.test1(id int primary key)") + c.Assert(mockAsyncSink.ddlExecuting.Query, check.Equals, "CREATE TABLE test1.test1(id int primary key)") // executing the ddl finished mockAsyncSink.ddlDone = true From 3eefa97afab2a0fa3771c4e9e573346a6aab0c28 Mon Sep 17 00:00:00 2001 From: sdojjy Date: Tue, 14 Dec 2021 10:33:27 +0800 Subject: [PATCH 4/8] fix ut --- cdc/owner/changefeed_test.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index 12f9f8045e7..b0c8c771600 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -16,7 +16,6 @@ package owner import ( "context" "sync/atomic" - "testing" "time" "github.com/pingcap/check" @@ -31,7 +30,6 @@ import ( "github.com/pingcap/ticdc/pkg/util/testleak" "github.com/pingcap/ticdc/pkg/version" timodel "github.com/pingcap/tidb/parser/model" - "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" ) @@ -287,7 +285,7 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) { mockDDLPuller.ddlQueue = append(mockDDLPuller.ddlQueue, job) tickThreeTime() c.Assert(state.Status.CheckpointTs, check.Equals, mockDDLPuller.resolvedTs) - c.Assert(mockAsyncSink.ddlExecuting.Query, check.Equals, "CREATE TABLE test1.test1(id int primary key)") + c.Assert(mockAsyncSink.ddlExecuting.Query, check.Equals, "CREATE TABLE test1.test1 (id int PRIMARY KEY)") // executing the ddl finished mockAsyncSink.ddlDone = true @@ -355,7 +353,7 @@ func (s *changefeedSuite) TestFinished(c *check.C) { c.Assert(state.Info.State, check.Equals, model.StateFinished) } -func TestAddSpecialComment(t *testing.T) { +func (s *changefeedSuite) TestAddSpecialComment(c *check.C) { testCase := []struct { input string result string @@ -488,10 +486,10 @@ func TestAddSpecialComment(t *testing.T) { } for _, ca := range testCase { re, err := addSpecialComment(ca.input) - require.Nil(t, err) - require.Equal(t, ca.result, re) + c.Check(err, check.IsNil) + c.Check(re, check.Equals, ca.result) } - require.Panics(t, func() { + c.Assert(func() { _, _ = addSpecialComment("alter table t force, auto_increment = 12;alter table t force, auto_increment = 12;") - }) + }, check.Panics, "invalid ddlQuery statement size") } From cafac047213698cf172b351a59935fac41fd1619 Mon Sep 17 00:00:00 2001 From: sdojjy Date: Tue, 14 Dec 2021 11:51:18 +0800 Subject: [PATCH 5/8] fix lint --- cdc/owner/changefeed_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index b0c8c771600..15aebe77973 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -354,6 +354,7 @@ func (s *changefeedSuite) TestFinished(c *check.C) { } func (s *changefeedSuite) TestAddSpecialComment(c *check.C) { + defer testleak.AfterTest(c)() testCase := []struct { input string result string From 28004662eb225f01d2796ff8e3b2b211208fb470 Mon Sep 17 00:00:00 2001 From: sdojjy Date: Thu, 16 Dec 2021 13:50:24 +0800 Subject: [PATCH 6/8] update restore flags --- cdc/owner/changefeed.go | 3 +- cdc/owner/changefeed_test.go | 67 ++++++++++++++++++------------------ 2 files changed, 35 insertions(+), 35 deletions(-) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 68d7477a732..70138a78166 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -521,7 +521,8 @@ func addSpecialComment(ddlQuery string) (string, error) { log.Panic("invalid ddlQuery statement size", zap.String("ddlQuery", ddlQuery)) } var sb strings.Builder - if err = stms[0].Restore(format.NewRestoreCtx(format.RestoreTiDBSpecialComment, &sb)); err != nil { + if err = stms[0].Restore(format.NewRestoreCtx(format.RestoreStringSingleQuotes|format.RestoreNameBackQuotes| + format.RestoreKeyWordUppercase|format.RestoreTiDBSpecialComment, &sb)); err != nil { return "", errors.Trace(err) } return sb.String(), nil diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index 15aebe77973..15434336bb4 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -270,7 +270,7 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) { mockDDLPuller.ddlQueue = append(mockDDLPuller.ddlQueue, job) tickThreeTime() c.Assert(state.Status.CheckpointTs, check.Equals, mockDDLPuller.resolvedTs) - c.Assert(mockAsyncSink.ddlExecuting.Query, check.Equals, "CREATE DATABASE test1") + c.Assert(mockAsyncSink.ddlExecuting.Query, check.Equals, "CREATE DATABASE `test1`") // executing the ddl finished mockAsyncSink.ddlDone = true @@ -285,7 +285,7 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) { mockDDLPuller.ddlQueue = append(mockDDLPuller.ddlQueue, job) tickThreeTime() c.Assert(state.Status.CheckpointTs, check.Equals, mockDDLPuller.resolvedTs) - c.Assert(mockAsyncSink.ddlExecuting.Query, check.Equals, "CREATE TABLE test1.test1 (id int PRIMARY KEY)") + c.Assert(mockAsyncSink.ddlExecuting.Query, check.Equals, "CREATE TABLE `test1`.`test1` (`id` INT PRIMARY KEY)") // executing the ddl finished mockAsyncSink.ddlDone = true @@ -361,128 +361,127 @@ func (s *changefeedSuite) TestAddSpecialComment(c *check.C) { }{ { "create table t1 (id int ) shard_row_id_bits=2;", - "CREATE TABLE t1 (id int) /*T! SHARD_ROW_ID_BITS = 2 */", + "CREATE TABLE `t1` (`id` INT) /*T! SHARD_ROW_ID_BITS = 2 */", }, { "create table t1 (id int ) shard_row_id_bits=2 pre_split_regions=2;", - "CREATE TABLE t1 (id int) /*T! SHARD_ROW_ID_BITS = 2 */ /*T! PRE_SPLIT_REGIONS = 2 */", + "CREATE TABLE `t1` (`id` INT) /*T! SHARD_ROW_ID_BITS = 2 */ /*T! PRE_SPLIT_REGIONS = 2 */", }, { "create table t1 (id int ) shard_row_id_bits=2 pre_split_regions=2;", - "CREATE TABLE t1 (id int) /*T! SHARD_ROW_ID_BITS = 2 */ /*T! PRE_SPLIT_REGIONS = 2 */", + "CREATE TABLE `t1` (`id` INT) /*T! SHARD_ROW_ID_BITS = 2 */ /*T! PRE_SPLIT_REGIONS = 2 */", }, - { "create table t1 (id int ) shard_row_id_bits=2 engine=innodb pre_split_regions=2;", - "CREATE TABLE t1 (id int) /*T! SHARD_ROW_ID_BITS = 2 */ ENGINE = innodb /*T! PRE_SPLIT_REGIONS = 2 */", + "CREATE TABLE `t1` (`id` INT) /*T! SHARD_ROW_ID_BITS = 2 */ ENGINE = innodb /*T! PRE_SPLIT_REGIONS = 2 */", }, { "create table t1 (id int ) pre_split_regions=2 shard_row_id_bits=2;", - "CREATE TABLE t1 (id int) /*T! PRE_SPLIT_REGIONS = 2 */ /*T! SHARD_ROW_ID_BITS = 2 */", + "CREATE TABLE `t1` (`id` INT) /*T! PRE_SPLIT_REGIONS = 2 */ /*T! SHARD_ROW_ID_BITS = 2 */", }, { "create table t6 (id int ) shard_row_id_bits=2 shard_row_id_bits=3 pre_split_regions=2;", - "CREATE TABLE t6 (id int) /*T! SHARD_ROW_ID_BITS = 2 */ /*T! SHARD_ROW_ID_BITS = 3 */ /*T! PRE_SPLIT_REGIONS = 2 */", + "CREATE TABLE `t6` (`id` INT) /*T! SHARD_ROW_ID_BITS = 2 */ /*T! SHARD_ROW_ID_BITS = 3 */ /*T! PRE_SPLIT_REGIONS = 2 */", }, { "create table t1 (id int primary key auto_random(2));", - "CREATE TABLE t1 (id int PRIMARY KEY /*T![auto_rand] AUTO_RANDOM(2) */)", + "CREATE TABLE `t1` (`id` INT PRIMARY KEY /*T![auto_rand] AUTO_RANDOM(2) */)", }, { "create table t1 (id int primary key auto_random);", - "CREATE TABLE t1 (id int PRIMARY KEY /*T![auto_rand] AUTO_RANDOM */)", + "CREATE TABLE `t1` (`id` INT PRIMARY KEY /*T![auto_rand] AUTO_RANDOM */)", }, { "create table t1 (id int auto_random ( 4 ) primary key);", - "CREATE TABLE t1 (id int /*T![auto_rand] AUTO_RANDOM(4) */ PRIMARY KEY)", + "CREATE TABLE `t1` (`id` INT /*T![auto_rand] AUTO_RANDOM(4) */ PRIMARY KEY)", }, { "create table t1 (id int auto_random ( 4 ) primary key);", - "CREATE TABLE t1 (id int /*T![auto_rand] AUTO_RANDOM(4) */ PRIMARY KEY)", + "CREATE TABLE `t1` (`id` INT /*T![auto_rand] AUTO_RANDOM(4) */ PRIMARY KEY)", }, { "create table t1 (id int auto_random ( 3 ) primary key) auto_random_base = 100;", - "CREATE TABLE t1 (id int /*T![auto_rand] AUTO_RANDOM(3) */ PRIMARY KEY) /*T![auto_rand_base] AUTO_RANDOM_BASE = 100 */", + "CREATE TABLE `t1` (`id` INT /*T![auto_rand] AUTO_RANDOM(3) */ PRIMARY KEY) /*T![auto_rand_base] AUTO_RANDOM_BASE = 100 */", }, { "create table t1 (id int auto_random primary key) auto_random_base = 50;", - "CREATE TABLE t1 (id int /*T![auto_rand] AUTO_RANDOM */ PRIMARY KEY) /*T![auto_rand_base] AUTO_RANDOM_BASE = 50 */", + "CREATE TABLE `t1` (`id` INT /*T![auto_rand] AUTO_RANDOM */ PRIMARY KEY) /*T![auto_rand_base] AUTO_RANDOM_BASE = 50 */", }, { "create table t1 (id int auto_increment key) auto_id_cache 100;", - "CREATE TABLE t1 (id int AUTO_INCREMENT PRIMARY KEY) /*T![auto_id_cache] AUTO_ID_CACHE = 100 */", + "CREATE TABLE `t1` (`id` INT AUTO_INCREMENT PRIMARY KEY) /*T![auto_id_cache] AUTO_ID_CACHE = 100 */", }, { "create table t1 (id int auto_increment unique) auto_id_cache 10;", - "CREATE TABLE t1 (id int AUTO_INCREMENT UNIQUE KEY) /*T![auto_id_cache] AUTO_ID_CACHE = 10 */", + "CREATE TABLE `t1` (`id` INT AUTO_INCREMENT UNIQUE KEY) /*T![auto_id_cache] AUTO_ID_CACHE = 10 */", }, { "create table t1 (id int) auto_id_cache = 5;", - "CREATE TABLE t1 (id int) /*T![auto_id_cache] AUTO_ID_CACHE = 5 */", + "CREATE TABLE `t1` (`id` INT) /*T![auto_id_cache] AUTO_ID_CACHE = 5 */", }, { "create table t1 (id int) auto_id_cache=5;", - "CREATE TABLE t1 (id int) /*T![auto_id_cache] AUTO_ID_CACHE = 5 */", + "CREATE TABLE `t1` (`id` INT) /*T![auto_id_cache] AUTO_ID_CACHE = 5 */", }, { "create table t1 (id int) /*T![auto_id_cache] auto_id_cache=5 */ ;", - "CREATE TABLE t1 (id int) /*T![auto_id_cache] AUTO_ID_CACHE = 5 */", + "CREATE TABLE `t1` (`id` INT) /*T![auto_id_cache] AUTO_ID_CACHE = 5 */", }, { "create table t1 (id int, a varchar(255), primary key (a, b) clustered);", - "CREATE TABLE t1 (id int,a varchar(255),PRIMARY KEY(a, b) /*T![clustered_index] CLUSTERED */)", + "CREATE TABLE `t1` (`id` INT,`a` VARCHAR(255),PRIMARY KEY(`a`, `b`) /*T![clustered_index] CLUSTERED */)", }, { "create table t1(id int, v int, primary key(a) clustered);", - "CREATE TABLE t1 (id int,v int,PRIMARY KEY(a) /*T![clustered_index] CLUSTERED */)", + "CREATE TABLE `t1` (`id` INT,`v` INT,PRIMARY KEY(`a`) /*T![clustered_index] CLUSTERED */)", }, { "create table t1(id int primary key clustered, v int);", - "CREATE TABLE t1 (id int PRIMARY KEY /*T![clustered_index] CLUSTERED */,v int)", + "CREATE TABLE `t1` (`id` INT PRIMARY KEY /*T![clustered_index] CLUSTERED */,`v` INT)", }, { "alter table t add primary key(a) clustered;", - "ALTER TABLE t ADD PRIMARY KEY(a) /*T![clustered_index] CLUSTERED */", + "ALTER TABLE `t` ADD PRIMARY KEY(`a`) /*T![clustered_index] CLUSTERED */", }, { "create table t1 (id int, a varchar(255), primary key (a, b) nonclustered);", - "CREATE TABLE t1 (id int,a varchar(255),PRIMARY KEY(a, b) /*T![clustered_index] NONCLUSTERED */)", + "CREATE TABLE `t1` (`id` INT,`a` VARCHAR(255),PRIMARY KEY(`a`, `b`) /*T![clustered_index] NONCLUSTERED */)", }, { "create table t1 (id int, a varchar(255), primary key (a, b) /*T![clustered_index] nonclustered */);", - "CREATE TABLE t1 (id int,a varchar(255),PRIMARY KEY(a, b) /*T![clustered_index] NONCLUSTERED */)", + "CREATE TABLE `t1` (`id` INT,`a` VARCHAR(255),PRIMARY KEY(`a`, `b`) /*T![clustered_index] NONCLUSTERED */)", }, { "create table clustered_test(id int)", - "CREATE TABLE clustered_test (id int)", + "CREATE TABLE `clustered_test` (`id` INT)", }, { "create database clustered_test", - "CREATE DATABASE clustered_test", + "CREATE DATABASE `clustered_test`", }, { "create database clustered", - "CREATE DATABASE clustered", + "CREATE DATABASE `clustered`", }, { "create table clustered (id int)", - "CREATE TABLE clustered (id int)", + "CREATE TABLE `clustered` (`id` INT)", }, { "create table t1 (id int, a varchar(255) key clustered);", - "CREATE TABLE t1 (id int,a varchar(255) PRIMARY KEY /*T![clustered_index] CLUSTERED */)", + "CREATE TABLE `t1` (`id` INT,`a` VARCHAR(255) PRIMARY KEY /*T![clustered_index] CLUSTERED */)", }, { "alter table t force auto_increment = 12;", - "ALTER TABLE t /*T![force_inc] FORCE */ AUTO_INCREMENT = 12", + "ALTER TABLE `t` /*T![force_inc] FORCE */ AUTO_INCREMENT = 12", }, { "alter table t force, auto_increment = 12;", - "ALTER TABLE t FORCE /* AlterTableForce is not supported */ , AUTO_INCREMENT = 12", + "ALTER TABLE `t` FORCE /* AlterTableForce is not supported */ , AUTO_INCREMENT = 12", }, { "create table cdc_test (id varchar(10) primary key ,c1 varchar(10)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin/*!90000 SHARD_ROW_ID_BITS=4 PRE_SPLIT_REGIONS=3 */", - "CREATE TABLE cdc_test (id varchar(10) PRIMARY KEY,c1 varchar(10)) ENGINE = InnoDB DEFAULT CHARACTER SET = utf8mb4 DEFAULT COLLATE = utf8mb4_bin /*T! SHARD_ROW_ID_BITS = 4 */ /*T! PRE_SPLIT_REGIONS = 3 */", + "CREATE TABLE `cdc_test` (`id` VARCHAR(10) PRIMARY KEY,`c1` VARCHAR(10)) ENGINE = InnoDB DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_BIN /*T! SHARD_ROW_ID_BITS = 4 */ /*T! PRE_SPLIT_REGIONS = 3 */", }, } for _, ca := range testCase { From 9f2082467faf3d077af3ee5696c35c2e0b1ecdf2 Mon Sep 17 00:00:00 2001 From: sdojjy Date: Thu, 16 Dec 2021 16:54:46 +0800 Subject: [PATCH 7/8] fix integration test --- tests/integration_tests/ddl_reentrant/run.sh | 64 +++++++++++--------- 1 file changed, 34 insertions(+), 30 deletions(-) diff --git a/tests/integration_tests/ddl_reentrant/run.sh b/tests/integration_tests/ddl_reentrant/run.sh index 6e788d9dcf6..7867179066d 100644 --- a/tests/integration_tests/ddl_reentrant/run.sh +++ b/tests/integration_tests/ddl_reentrant/run.sh @@ -8,26 +8,27 @@ WORK_DIR=$OUT_DIR/$TEST_NAME CDC_BINARY=cdc.test SINK_TYPE=$1 -ddls=("create database ddl_reentrant" false - "create table ddl_reentrant.t1 (id int primary key, id2 int not null, a varchar(10) not null, unique a(a), unique id2(id2))" false - "alter table ddl_reentrant.t1 add column b int" false - "alter table ddl_reentrant.t1 drop column b" false - "alter table ddl_reentrant.t1 add key index_a(a)" false - "alter table ddl_reentrant.t1 drop index index_a" false - "truncate table ddl_reentrant.t1" true - "alter table ddl_reentrant.t1 modify a varchar(20)" true - "rename table ddl_reentrant.t1 to ddl_reentrant.t2" false - "alter table ddl_reentrant.t2 alter a set default 'hello'" true - "alter table ddl_reentrant.t2 comment='modify comment'" true - "alter table ddl_reentrant.t2 rename index a to idx_a" false - "create table ddl_reentrant.t3 (a int primary key, b int) partition by range(a) (partition p0 values less than (1000), partition p1 values less than (2000))" false - "alter table ddl_reentrant.t3 add partition (partition p2 values less than (3000))" false - "alter table ddl_reentrant.t3 drop partition p2" false - "alter table ddl_reentrant.t3 truncate partition p0" true - "create view ddl_reentrant.t3_view as select a, b from ddl_reentrant.t3" false - "drop view ddl_reentrant.t3_view" false - "alter table ddl_reentrant.t3 default character set utf8mb4 default collate utf8mb4_unicode_ci" true - "alter schema ddl_reentrant default character set utf8mb4 default collate utf8mb4_unicode_ci" true +# cdc parse and restore ddl with flags format.RestoreStringSingleQuotes|format.RestoreNameBackQuotes|format.RestoreKeyWordUppercase|format.RestoreTiDBSpecialComment +ddls=("create database ddl_reentrant" false 'CREATE DATABASE `ddl_reentrant`' + "create table ddl_reentrant.t1 (id int primary key, id2 int not null, a varchar(10) not null, unique a(a), unique id2(id2))" false 'CREATE TABLE `ddl_reentrant`.`t1` (`id` INT PRIMARY KEY,`id2` INT NOT NULL,`a` VARCHAR(10) NOT NULL,UNIQUE `a`(`a`),UNIQUE `id2`(`id2`))' + "alter table ddl_reentrant.t1 add column b int" false 'ALTER TABLE `ddl_reentrant`.`t1` ADD COLUMN `b` INT' + "alter table ddl_reentrant.t1 drop column b" false 'ALTER TABLE `ddl_reentrant`.`t1` DROP COLUMN `b`' + "alter table ddl_reentrant.t1 add key index_a(a)" false 'ALTER TABLE `ddl_reentrant`.`t1` ADD INDEX `index_a`(`a`)' + "alter table ddl_reentrant.t1 drop index index_a" false 'ALTER TABLE `ddl_reentrant`.`t1` DROP INDEX `index_a`' + "truncate table ddl_reentrant.t1" true 'TRUNCATE TABLE `ddl_reentrant`.`t1`' + "alter table ddl_reentrant.t1 modify a varchar(20)" true 'ALTER TABLE `ddl_reentrant`.`t1` MODIFY COLUMN `a` VARCHAR(20)' + "rename table ddl_reentrant.t1 to ddl_reentrant.t2" false 'RENAME TABLE `ddl_reentrant`.`t1` TO `ddl_reentrant`.`t2`' + "alter table ddl_reentrant.t2 alter a set default 'hello'" true 'ALTER TABLE `ddl_reentrant`.`t2` ALTER COLUMN `a` SET DEFAULT _UTF8MB4'"'hello'" + "alter table ddl_reentrant.t2 comment='modify comment'" true 'ALTER TABLE `ddl_reentrant`.`t2` COMMENT = '"'modify comment'" + "alter table ddl_reentrant.t2 rename index a to idx_a" false 'ALTER TABLE `ddl_reentrant`.`t2` RENAME INDEX `a` TO `idx_a`' + "create table ddl_reentrant.t3 (a int primary key, b int) partition by range(a) (partition p0 values less than (1000), partition p1 values less than (2000))" false 'CREATE TABLE `ddl_reentrant`.`t3` (`a` INT PRIMARY KEY,`b` INT) PARTITION BY RANGE (`a`) (PARTITION `p0` VALUES LESS THAN (1000),PARTITION `p1` VALUES LESS THAN (2000))' + "alter table ddl_reentrant.t3 add partition (partition p2 values less than (3000))" false 'ALTER TABLE `ddl_reentrant`.`t3` ADD PARTITION (PARTITION `p2` VALUES LESS THAN (3000))' + "alter table ddl_reentrant.t3 drop partition p2" false 'ALTER TABLE `ddl_reentrant`.`t3` DROP PARTITION `p2`' + "alter table ddl_reentrant.t3 truncate partition p0" true 'ALTER TABLE `ddl_reentrant`.`t3` TRUNCATE PARTITION `p0`' + "create view ddl_reentrant.t3_view as select a, b from ddl_reentrant.t3" false 'CREATE ALGORITHM = UNDEFINED DEFINER = CURRENT_USER SQL SECURITY DEFINER VIEW `ddl_reentrant`.`t3_view` AS SELECT `a`,`b` FROM `ddl_reentrant`.`t3`' + "drop view ddl_reentrant.t3_view" false 'DROP VIEW `ddl_reentrant`.`t3_view`' + "alter table ddl_reentrant.t3 default character set utf8mb4 default collate utf8mb4_unicode_ci" true 'ALTER TABLE `ddl_reentrant`.`t3` CHARACTER SET UTF8MB4 COLLATE UTF8MB4_UNICODE_CI' + "alter schema ddl_reentrant default character set utf8mb4 default collate utf8mb4_unicode_ci" true 'ALTER DATABASE `ddl_reentrant` CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci' ) function complete_ddls() { @@ -36,14 +37,14 @@ function complete_ddls() { echo "skip some DDLs in tidb v4.0.x" else # DDLs that are supportted since 5.0 - ddls+=("alter table ddl_reentrant.t2 add column c1 int, add column c2 int, add column c3 int" false) - ddls+=("alter table ddl_reentrant.t2 drop column c1, drop column c2, drop column c3" false) + ddls+=("alter table ddl_reentrant.t2 add column c1 int, add column c2 int, add column c3 int" false 'ALTER TABLE `ddl_reentrant`.`t2` ADD COLUMN `c1` INT, ADD COLUMN `c2` INT, ADD COLUMN `c3` INT') + ddls+=("alter table ddl_reentrant.t2 drop column c1, drop column c2, drop column c3" false 'ALTER TABLE `ddl_reentrant`.`t2` DROP COLUMN `c1`, DROP COLUMN `c2`, DROP COLUMN `c3`') fi - ddls+=("alter table ddl_reentrant.t2 drop primary key" false) - ddls+=("alter table ddl_reentrant.t2 add primary key pk(id)" false) - ddls+=("drop table ddl_reentrant.t2" false) - ddls+=("recover table ddl_reentrant.t2" false) - ddls+=("drop database ddl_reentrant" false) + ddls+=("alter table ddl_reentrant.t2 drop primary key" false 'ALTER TABLE `ddl_reentrant`.`t2` DROP PRIMARY KEY') + ddls+=("alter table ddl_reentrant.t2 add primary key pk(id)" false 'ALTER TABLE `ddl_reentrant`.`t2` ADD PRIMARY KEY `pk`(`id`)') + ddls+=("drop table ddl_reentrant.t2" false 'DROP TABLE `ddl_reentrant`.`t2`') + ddls+=("recover table ddl_reentrant.t2" false 'RECOVER TABLE `ddl_reentrant`.`t2`') + ddls+=("drop database ddl_reentrant" false 'DROP DATABASE `ddl_reentrant`') } changefeedid="" @@ -94,14 +95,15 @@ tidb_build_branch=$(mysql -uroot -h${UP_TIDB_HOST} -P${UP_TIDB_PORT} -e \ function ddl_test() { ddl=$1 is_reentrant=$2 + restored_sql=$3 echo "------------------------------------------" - echo "test ddl $ddl, is_reentrant: $is_reentrant" + echo "test ddl $ddl, is_reentrant: $is_reentrant restored_sql: $restored_sql" run_sql $ddl ${UP_TIDB_HOST} ${UP_TIDB_PORT} ensure 10 check_ts_forward $changefeedid - echo $ddl >${WORK_DIR}/ddl_temp.sql + echo $restored_sql >${WORK_DIR}/ddl_temp.sql ensure 10 check_ddl_executed "${WORK_DIR}/cdc.log" "${WORK_DIR}/ddl_temp.sql" true ddl_finished_ts=$(grep "Execute DDL succeeded" ${WORK_DIR}/cdc.log | tail -n 1 | grep -oE '"CommitTs\\":[0-9]{18}' | awk -F: '{print $(NF)}') cdc cli changefeed remove --changefeed-id=${changefeedid} @@ -146,7 +148,9 @@ function run() { idx=$((idx + 1)) idxs_reentrant=${ddls[$idx]} idx=$((idx + 1)) - ddl_test $ddl $idxs_reentrant + restored_sql=${ddls[$idx]} + idx=$((idx + 1)) + ddl_test $ddl $idxs_reentrant $restored_sql done IFS=$OLDIFS From 6aed328c35050d6026dc802ecf5dddf6fc06c072 Mon Sep 17 00:00:00 2001 From: sdojjy Date: Fri, 17 Dec 2021 17:10:14 +0800 Subject: [PATCH 8/8] address comment --- cdc/owner/changefeed.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 78ffc291643..80878e0afc2 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -551,8 +551,15 @@ func addSpecialComment(ddlQuery string) (string, error) { log.Panic("invalid ddlQuery statement size", zap.String("ddlQuery", ddlQuery)) } var sb strings.Builder - if err = stms[0].Restore(format.NewRestoreCtx(format.RestoreStringSingleQuotes|format.RestoreNameBackQuotes| - format.RestoreKeyWordUppercase|format.RestoreTiDBSpecialComment, &sb)); err != nil { + // translate TiDB feature to special comment + restoreFlags := format.RestoreTiDBSpecialComment + // escape the keyword + restoreFlags |= format.RestoreNameBackQuotes + // upper case keyword + restoreFlags |= format.RestoreKeyWordUppercase + // wrap string with single quote + restoreFlags |= format.RestoreStringSingleQuotes + if err = stms[0].Restore(format.NewRestoreCtx(restoreFlags, &sb)); err != nil { return "", errors.Trace(err) } return sb.String(), nil