diff --git a/cdc/owner/ddl_manager.go b/cdc/owner/ddl_manager.go index 43c66686c38..13fd237b3c3 100644 --- a/cdc/owner/ddl_manager.go +++ b/cdc/owner/ddl_manager.go @@ -224,24 +224,7 @@ func (m *ddlManager) tick( } for _, event := range events { - // TODO: find a better place to do this check - // check if the ddl event is belong to an ineligible table. - // If so, we should ignore it. - if !filter.IsSchemaDDL(event.Type) { - ignore, err := m.schema. - IsIneligibleTable(ctx, event.TableInfo.TableName.TableID, event.CommitTs) - if err != nil { - return nil, nil, errors.Trace(err) - } - if ignore { - log.Warn("ignore the DDL event of ineligible table", - zap.String("changefeed", m.changfeedID.ID), zap.Any("ddl", event)) - continue - } - } - tableName := event.TableInfo.TableName - // Add all valid DDL events to the pendingDDLs. m.pendingDDLs[tableName] = append(m.pendingDDLs[tableName], event) } diff --git a/cdc/puller/ddl_puller.go b/cdc/puller/ddl_puller.go index ed7f505ca37..98a438c84fc 100644 --- a/cdc/puller/ddl_puller.go +++ b/cdc/puller/ddl_puller.go @@ -16,6 +16,7 @@ package puller import ( "context" "encoding/json" + "fmt" "sync" "sync/atomic" "time" @@ -28,6 +29,7 @@ import ( "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tiflow/cdc/contextutil" "github.com/pingcap/tiflow/cdc/entry" + "github.com/pingcap/tiflow/cdc/entry/schema" "github.com/pingcap/tiflow/cdc/kv" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sorter/memory" @@ -122,8 +124,7 @@ func (p *ddlJobPullerImpl) Run(ctx context.Context) error { if job != nil { skip, err := p.handleJob(job) if err != nil { - return cerror.WrapError(cerror.ErrHandleDDLFailed, - err, job.String(), job.Query, job.StartTS, job.StartTS) + return err } log.Info("handle ddl job", zap.String("namespace", p.changefeedID.Namespace), @@ -358,7 +359,8 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) { if p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.TableName) { return true, nil } - return true, errors.Trace(err) + return false, cerror.WrapError(cerror.ErrHandleDDLFailed, + errors.Trace(err), job.Query, job.StartTS, job.StartTS) } if job.BinlogInfo.FinishedTS <= p.getResolvedTs() || @@ -381,7 +383,17 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) { case timodel.ActionRenameTables: skip, err = p.handleRenameTables(job) if err != nil { - return true, errors.Trace(err) + log.Warn("handle rename tables ddl job failed", + zap.String("namespace", p.changefeedID.Namespace), + zap.String("changefeed", p.changefeedID.ID), + zap.String("schema", job.SchemaName), + zap.String("table", job.TableName), + zap.String("query", job.Query), + zap.Uint64("startTs", job.StartTS), + zap.Uint64("finishTs", job.BinlogInfo.FinishedTS), + zap.Error(err)) + return false, cerror.WrapError(cerror.ErrHandleDDLFailed, + errors.Trace(err), job.Query, job.StartTS, job.StartTS) } case timodel.ActionRenameTable: log.Info("rename table ddl job", @@ -396,7 +408,7 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) { // 1. If we can not find the old table, and the new table name is in filter rule, return error. discard := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O) if !discard { - return true, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query) + return false, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query) } skip = true } else { @@ -408,7 +420,7 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) { skipByNewTableName := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O) // 3. If its old table name is not in filter rule, and its new table name in filter rule, return error. if skipByOldTableName && !skipByNewTableName { - return true, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query) + return false, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query) } if skipByOldTableName && skipByNewTableName { skip = true @@ -437,13 +449,53 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) { zap.String("table", job.BinlogInfo.TableInfo.Name.O), zap.String("job", job.String()), zap.Error(err)) - return true, errors.Trace(err) + return false, cerror.WrapError(cerror.ErrHandleDDLFailed, + errors.Trace(err), job.Query, job.StartTS, job.StartTS) } p.setResolvedTs(job.BinlogInfo.FinishedTS) p.schemaVersion = job.BinlogInfo.SchemaVersion - return false, nil + return p.checkIneligibleTableDDL(snap, job) +} + +// checkIneligibleTableDDL checks if the table is ineligible before and after the DDL. +// 1. If it is not a table DDL, we shouldn't check it. +// 2. If the table after the DDL is ineligible: +// a. If the table is not exist before the DDL, we should ignore the DDL. +// b. If the table is ineligible before the DDL, we should ignore the DDL. +// c. If the table is eligible before the DDL, we should return an error. +func (p *ddlJobPullerImpl) checkIneligibleTableDDL(snapBefore *schema.Snapshot, job *timodel.Job) (skip bool, err error) { + if filter.IsSchemaDDL(job.Type) { + return false, nil + } + + ineligible := p.schemaStorage.GetLastSnapshot().IsIneligibleTableID(job.TableID) + if !ineligible { + return false, nil + } + + // If the table is not in the snapshot before the DDL, + // we should ignore the DDL. + _, exist := snapBefore.PhysicalTableByID(job.TableID) + if !exist { + return true, nil + } + + // If the table after the DDL is ineligible, we should check if it is not ineligible before the DDL. + // If so, we should return an error to inform the user that it is a + // dangerous operation and should be handled manually. + isBeforeineligible := snapBefore.IsIneligibleTableID(job.TableID) + if isBeforeineligible { + log.Warn("ignore the DDL event of ineligible table", + zap.String("changefeed", p.changefeedID.ID), zap.Any("ddl", job)) + return true, nil + } + return false, cerror.New(fmt.Sprintf("An eligible table become ineligible after DDL: [%s] "+ + "it is a dangerous operation and may cause data loss. If you want to replicate this ddl safely, "+ + "pelase pause the changefeed and update the `force-replicate=true` "+ + "in the changefeed configuration, "+ + "then resume the changefeed.", job.Query)) } func findDBByName(dbs []*timodel.DBInfo, name string) (*timodel.DBInfo, error) { diff --git a/cdc/puller/ddl_puller_test.go b/cdc/puller/ddl_puller_test.go index aa51bf1ef6b..b5a3ffa719c 100644 --- a/cdc/puller/ddl_puller_test.go +++ b/cdc/puller/ddl_puller_test.go @@ -199,23 +199,23 @@ func TestHandleRenameTable(t *testing.T) { mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - job = helper.DDL2Job("create table test1.t1(id int)") + job = helper.DDL2Job("create table test1.t1(id int primary key)") remainTables[0] = job.TableID mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - job = helper.DDL2Job("create table test1.t2(id int)") + job = helper.DDL2Job("create table test1.t2(id int primary key)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - job = helper.DDL2Job("create table test1.t3(id int)") + job = helper.DDL2Job("create table test1.t3(id int primary key)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - job = helper.DDL2Job("create table test1.t5(id int)") + job = helper.DDL2Job("create table test1.t5(id int primary key)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) @@ -225,7 +225,7 @@ func TestHandleRenameTable(t *testing.T) { mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - job = helper.DDL2Job("create table ignore1.a(id int)") + job = helper.DDL2Job("create table ignore1.a(id int primary key)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) @@ -240,7 +240,7 @@ func TestHandleRenameTable(t *testing.T) { } { - _ = helper.DDL2Job("create table test1.t6(id int)") + _ = helper.DDL2Job("create table test1.t6(id int primary key)") job := helper.DDL2Job("rename table test1.t2 to test1.t22, test1.t6 to test1.t66") skip, err := ddlJobPullerImpl.handleRenameTables(job) require.Error(t, err) @@ -257,17 +257,17 @@ func TestHandleRenameTable(t *testing.T) { mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - job = helper.DDL2Job("create table test2.t1(id int)") + job = helper.DDL2Job("create table test2.t1(id int primary key)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - job = helper.DDL2Job("create table test2.t2(id int)") + job = helper.DDL2Job("create table test2.t2(id int primary key)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - job = helper.DDL2Job("create table test2.t3(id int)") + job = helper.DDL2Job("create table test2.t3(id int primary key)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) @@ -285,13 +285,13 @@ func TestHandleRenameTable(t *testing.T) { mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - job = helper.DDL2Job("create table Test3.t1(id int)") + job = helper.DDL2Job("create table Test3.t1(id int primary key)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) // skip this table - job = helper.DDL2Job("create table Test3.t2(id int)") + job = helper.DDL2Job("create table Test3.t2(id int primary key)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) @@ -307,34 +307,34 @@ func TestHandleRenameTable(t *testing.T) { // test rename table { - job := helper.DDL2Job("create table test1.t99 (id int)") + job := helper.DDL2Job("create table test1.t99 (id int primary key)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) // this ddl should be skipped - job = helper.DDL2Job("create table test1.t1000 (id int)") + job = helper.DDL2Job("create table test1.t1000 (id int primary key)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) // this ddl should be skipped - job = helper.DDL2Job("create table test1.t888 (id int)") + job = helper.DDL2Job("create table test1.t888 (id int primary key)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - job = helper.DDL2Job("create table test1.t20230808 (id int)") + job = helper.DDL2Job("create table test1.t20230808 (id int primary key)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - job = helper.DDL2Job("create table test1.t202308081 (id int)") + job = helper.DDL2Job("create table test1.t202308081 (id int primary key)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - job = helper.DDL2Job("create table test1.t202308082 (id int)") + job = helper.DDL2Job("create table test1.t202308082 (id int primary key)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) @@ -346,9 +346,8 @@ func TestHandleRenameTable(t *testing.T) { // since test1.t100 is in filter rule, replicate it job = helper.DDL2Job("rename table test1.t1000 to test1.t100") - skip, err = ddlJobPullerImpl.handleJob(job) + _, err = ddlJobPullerImpl.handleJob(job) require.Error(t, err) - require.True(t, skip) require.Contains(t, err.Error(), fmt.Sprintf("table's old name is not in filter rule, and its new name in filter rule "+ "table id '%d', ddl query: [%s], it's an unexpected behavior, "+ "if you want to replicate this table, please add its old name to filter rule.", job.TableID, job.Query)) @@ -370,9 +369,8 @@ func TestHandleRenameTable(t *testing.T) { // but now it will throw an error since schema ignore1 are not in schemaStorage // ref: https://github.com/pingcap/tiflow/issues/9488 job = helper.DDL2Job("rename table test1.t202308081 to ignore1.ignore1, test1.t202308082 to ignore1.dongmen") - skip, err = ddlJobPullerImpl.handleJob(job) + _, err = ddlJobPullerImpl.handleJob(job) require.NotNil(t, err) - require.True(t, skip) require.Contains(t, err.Error(), "ErrSnapshotSchemaNotFound") } } @@ -432,7 +430,7 @@ func TestHandleJob(t *testing.T) { // test create table { - job := helper.DDL2Job("create table test1.t1(id int) partition by range(id) (partition p0 values less than (10))") + job := helper.DDL2Job("create table test1.t1(id int primary key) partition by range(id) (partition p0 values less than (10))") skip, err := ddlJobPullerImpl.handleJob(job) require.NoError(t, err) require.False(t, skip) @@ -442,7 +440,7 @@ func TestHandleJob(t *testing.T) { require.NoError(t, err) require.False(t, skip) - job = helper.DDL2Job("create table test1.testStartTs(id int)") + job = helper.DDL2Job("create table test1.testStartTs(id int primary key)") skip, err = ddlJobPullerImpl.handleJob(job) require.NoError(t, err) require.False(t, skip) @@ -453,23 +451,23 @@ func TestHandleJob(t *testing.T) { require.NoError(t, err) require.False(t, skip) - job = helper.DDL2Job("create table test1.t2(id int)") + job = helper.DDL2Job("create table test1.t2(id int primary key)") skip, err = ddlJobPullerImpl.handleJob(job) require.NoError(t, err) require.False(t, skip) - job = helper.DDL2Job("create table test1.t3(id int)") + job = helper.DDL2Job("create table test1.t3(id int primary key)") skip, err = ddlJobPullerImpl.handleJob(job) require.NoError(t, err) require.True(t, skip) - job = helper.DDL2Job("create table test1.t4(id int) partition by range(id) (partition p0 values less than (10))") + job = helper.DDL2Job("create table test1.t4(id int primary key) partition by range(id) (partition p0 values less than (10))") skip, err = ddlJobPullerImpl.handleJob(job) require.NoError(t, err) require.True(t, skip) // make sure no schema not found error - job = helper.DDL2Job("create table test3.t1(id int) partition by range(id) (partition p0 values less than (10))") + job = helper.DDL2Job("create table test3.t1(id int primary key) partition by range(id) (partition p0 values less than (10))") skip, err = ddlJobPullerImpl.handleJob(job) require.NoError(t, err) require.True(t, skip) @@ -645,7 +643,7 @@ func TestDDLPuller(t *testing.T) { StartTS: 5, State: timodel.JobStateDone, BinlogInfo: &timodel.HistoryInfo{SchemaVersion: 2, FinishedTS: 18}, - Query: "create table test.t1(id int)", + Query: "create table test.t1(id int primary key)", }) mockPuller.appendDDL(&timodel.Job{ ID: 1, @@ -653,7 +651,7 @@ func TestDDLPuller(t *testing.T) { StartTS: 5, State: timodel.JobStateDone, BinlogInfo: &timodel.HistoryInfo{SchemaVersion: 1, FinishedTS: 16}, - Query: "create table t2(id int)", + Query: "create table t2(id int primary key)", }) resolvedTs, ddl = p.PopFrontDDL() require.Equal(t, resolvedTs, uint64(15)) @@ -678,7 +676,7 @@ func TestDDLPuller(t *testing.T) { StartTS: 20, State: timodel.JobStateDone, BinlogInfo: &timodel.HistoryInfo{SchemaVersion: 4, FinishedTS: 25}, - Query: "create table t3(id int)", + Query: "create table t3(id int primary key)", }) mockPuller.appendDDL(&timodel.Job{ ID: 3, @@ -686,7 +684,7 @@ func TestDDLPuller(t *testing.T) { StartTS: 20, State: timodel.JobStateDone, BinlogInfo: &timodel.HistoryInfo{SchemaVersion: 4, FinishedTS: 25}, - Query: "create table t3(id int)", + Query: "create table t3(id int primary key)", }) mockPuller.appendResolvedTs(30) waitResolvedTsGrowing(t, p, 25) @@ -708,7 +706,7 @@ func TestDDLPuller(t *testing.T) { StartTS: 20, State: timodel.JobStateCancelled, BinlogInfo: &timodel.HistoryInfo{SchemaVersion: 6, FinishedTS: 36}, - Query: "create table t4(id int)", + Query: "create table t4(id int primary key)", }) mockPuller.appendResolvedTs(40) waitResolvedTsGrowing(t, p, 40) @@ -803,3 +801,65 @@ func waitResolvedTsGrowing(t *testing.T, p DDLPuller, targetTs model.Ts) { }, retry.WithBackoffBaseDelay(20), retry.WithMaxTries(200)) require.Nil(t, err) } + +func TestCcheckIneligibleTableDDL(t *testing.T) { + startTs := uint64(10) + mockPuller := newMockPuller(t, startTs) + ddlJobPuller, helper := newMockDDLJobPuller(t, mockPuller, true) + defer helper.Close() + + ddlJobPullerImpl := ddlJobPuller.(*ddlJobPullerImpl) + ddlJobPullerImpl.setResolvedTs(startTs) + + cfg := config.GetDefaultReplicaConfig() + f, err := filter.NewFilter(cfg, "") + require.NoError(t, err) + ddlJobPullerImpl.filter = f + + ddl := helper.DDL2Job("CREATE DATABASE test1") + skip, err := ddlJobPullerImpl.handleJob(ddl) + require.NoError(t, err) + require.False(t, skip) + + // case 1: create a table only has a primary key and drop it, expect an error. + // It is because the table is not eligible after the drop primary key DDL. + ddl = helper.DDL2Job(`CREATE TABLE test1.t1 ( + id INT PRIMARY KEY /*T![clustered_index] NONCLUSTERED */, + name VARCHAR(255), + email VARCHAR(255) UNIQUE + );`) + skip, err = ddlJobPullerImpl.handleJob(ddl) + require.NoError(t, err) + require.False(t, skip) + + ddl = helper.DDL2Job("ALTER TABLE test1.t1 DROP PRIMARY KEY;") + skip, err = ddlJobPullerImpl.handleJob(ddl) + require.Error(t, err) + require.False(t, skip) + require.Contains(t, err.Error(), "An eligible table become ineligible after DDL") + + // case 2: create a table has a primary key and another not null unique key, + // and drop the primary key, expect no error. + // It is because the table is still eligible after the drop primary key DDL. + ddl = helper.DDL2Job(`CREATE TABLE test1.t2 ( + id INT PRIMARY KEY /*T![clustered_index] NONCLUSTERED */, + name VARCHAR(255), + email VARCHAR(255) NOT NULL UNIQUE + );`) + skip, err = ddlJobPullerImpl.handleJob(ddl) + require.NoError(t, err) + require.False(t, skip) + + ddl = helper.DDL2Job("ALTER TABLE test1.t2 DROP PRIMARY KEY;") + skip, err = ddlJobPullerImpl.handleJob(ddl) + require.NoError(t, err) + require.False(t, skip) + + // case 3: continue to drop the unique key, expect an error. + // It is because the table is not eligible after the drop unique key DDL. + ddl = helper.DDL2Job("ALTER TABLE test1.t2 DROP INDEX email;") + skip, err = ddlJobPullerImpl.handleJob(ddl) + require.Error(t, err) + require.False(t, skip) + require.Contains(t, err.Error(), "An eligible table become ineligible after DDL") +}