Skip to content

Commit

Permalink
br: add integration test for pitr (#47740) (#47907)
Browse files Browse the repository at this point in the history
ref #47738
  • Loading branch information
ti-chi-bot authored Oct 31, 2023
1 parent 9b0a74a commit 77e8bd9
Show file tree
Hide file tree
Showing 8 changed files with 362 additions and 48 deletions.
2 changes: 1 addition & 1 deletion br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2242,7 +2242,7 @@ func (rc *Client) InitSchemasReplaceForDDL(
dbReplace.TableMap[t.Info.ID] = &stream.TableReplace{
OldTableInfo: t.Info,
NewTableID: newTableInfo.ID,
PartitionMap: getTableIDMap(newTableInfo, t.Info),
PartitionMap: getPartitionIDMap(newTableInfo, t.Info),
IndexMap: getIndexIDMap(newTableInfo, t.Info),
}
}
Expand Down
10 changes: 8 additions & 2 deletions br/pkg/restore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ type AppliedFile interface {
GetEndKey() []byte
}

// getTableIDMap creates a map maping old tableID to new tableID.
func getTableIDMap(newTable, oldTable *model.TableInfo) map[int64]int64 {
// getPartitionIDMap creates a map maping old physical ID to new physical ID.
func getPartitionIDMap(newTable, oldTable *model.TableInfo) map[int64]int64 {
tableIDMap := make(map[int64]int64)

if oldTable.Partition != nil && newTable.Partition != nil {
Expand All @@ -60,6 +60,12 @@ func getTableIDMap(newTable, oldTable *model.TableInfo) map[int64]int64 {
}
}

return tableIDMap
}

// getTableIDMap creates a map maping old tableID to new tableID.
func getTableIDMap(newTable, oldTable *model.TableInfo) map[int64]int64 {
tableIDMap := getPartitionIDMap(newTable, oldTable)
tableIDMap[oldTable.ID] = newTable.ID
return tableIDMap
}
Expand Down
96 changes: 57 additions & 39 deletions br/pkg/stream/rewrite_meta_rawkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/tablecodec"
filter "github.com/pingcap/tidb/util/table-filter"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -566,13 +568,9 @@ func (sr *SchemasReplace) RewriteKvEntry(e *kv.Entry, cf string) (*kv.Entry, err
func (sr *SchemasReplace) tryToGCJob(job *model.Job) error {
if !job.IsCancelled() {
switch job.Type {
case model.ActionAddIndex, model.ActionAddPrimaryKey:
if job.State == model.JobStateRollbackDone {
return sr.deleteRange(job)
}
return nil
case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex, model.ActionDropPrimaryKey,
model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionDropColumn, model.ActionDropColumns, model.ActionModifyColumn, model.ActionDropIndexes:
model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionDropColumn, model.ActionDropColumns, model.ActionModifyColumn, model.ActionDropIndexes,
model.ActionAddIndex, model.ActionAddPrimaryKey:
return sr.deleteRange(job)
case model.ActionMultiSchemaChange:
for _, sub := range job.MultiSchemaInfo.SubJobs {
Expand All @@ -587,10 +585,11 @@ func (sr *SchemasReplace) tryToGCJob(job *model.Job) error {
}

func (sr *SchemasReplace) deleteRange(job *model.Job) error {
lctx := logutil.ContextWithField(context.Background(), logutil.RedactAny("category", "ddl: rewrite delete range"))
dbReplace, exist := sr.DbMap[job.SchemaID]
if !exist {
// skip this mddljob, the same below
log.Debug("try to drop a non-existent range, missing oldDBID", zap.Int64("oldDBID", job.SchemaID))
logutil.CL(lctx).Warn("try to drop a non-existent range, missing oldDBID", zap.Int64("oldDBID", job.SchemaID))
return nil
}

Expand Down Expand Up @@ -626,21 +625,24 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
newTableIDs := make([]int64, 0, len(tableIDs))
for tableID, tableReplace := range dbReplace.TableMap {
if _, exist := argsSet[tableID]; !exist {
log.Debug("DropSchema: record a table, but it doesn't exist in job args", zap.Int64("oldTableID", tableID))
logutil.CL(lctx).Warn("DropSchema: record a table, but it doesn't exist in job args",
zap.Int64("oldTableID", tableID))
continue
}
newTableIDs = append(newTableIDs, tableReplace.NewTableID)
for partitionID, newPartitionID := range tableReplace.PartitionMap {
if _, exist := argsSet[partitionID]; !exist {
log.Debug("DropSchema: record a partition, but it doesn't exist in job args", zap.Int64("oldPartitionID", partitionID))
logutil.CL(lctx).Warn("DropSchema: record a partition, but it doesn't exist in job args",
zap.Int64("oldPartitionID", partitionID))
continue
}
newTableIDs = append(newTableIDs, newPartitionID)
}
}

if len(newTableIDs) != len(tableIDs) {
log.Debug("DropSchema: try to drop a non-existent table/partition, whose oldID doesn't exist in tableReplace")
logutil.CL(lctx).Warn(
"DropSchema: try to drop a non-existent table/partition, whose oldID doesn't exist in tableReplace")
// only drop newTableIDs' ranges
}

Expand All @@ -653,7 +655,8 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
case model.ActionDropTable, model.ActionTruncateTable:
tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug("DropTable/TruncateTable: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
logutil.CL(lctx).Warn("DropTable/TruncateTable: try to drop a non-existent table, missing oldTableID",
zap.Int64("oldTableID", job.TableID))
return nil
}

Expand All @@ -665,17 +668,19 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
return errors.Trace(err)
}
if len(physicalTableIDs) > 0 {
// delete partition id instead of table id
for i := 0; i < len(physicalTableIDs); i++ {
newPid, exist := tableReplace.PartitionMap[physicalTableIDs[i]]
newPhysicalTableIDs := make([]int64, 0, len(physicalTableIDs))
// delete partition id
for _, oldPid := range physicalTableIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug("DropTable/TruncateTable: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", physicalTableIDs[i]))
logutil.CL(lctx).Warn("DropTable/TruncateTable: try to drop a non-existent table, missing oldPartitionID",
zap.Int64("oldPartitionID", oldPid))
continue
}
physicalTableIDs[i] = newPid
newPhysicalTableIDs = append(newPhysicalTableIDs, newPid)
}
if len(physicalTableIDs) > 0 {
sr.insertDeleteRangeForTable(newJobID, physicalTableIDs)
if len(newPhysicalTableIDs) > 0 {
sr.insertDeleteRangeForTable(newJobID, newPhysicalTableIDs)
}
return nil
}
Expand All @@ -685,32 +690,37 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
case model.ActionDropTablePartition, model.ActionTruncateTablePartition:
tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug("DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
logutil.CL(lctx).Warn(
"DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldTableID",
zap.Int64("oldTableID", job.TableID))
return nil
}
var physicalTableIDs []int64
if err := job.DecodeArgs(&physicalTableIDs); err != nil {
return errors.Trace(err)
}

for i := 0; i < len(physicalTableIDs); i++ {
newPid, exist := tableReplace.PartitionMap[physicalTableIDs[i]]
newPhysicalTableIDs := make([]int64, 0, len(physicalTableIDs))
for _, oldPid := range physicalTableIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug("DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", physicalTableIDs[i]))
logutil.CL(lctx).Warn(
"DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldPartitionID",
zap.Int64("oldPartitionID", oldPid))
continue
}
physicalTableIDs[i] = newPid
newPhysicalTableIDs = append(newPhysicalTableIDs, newPid)
}
if len(physicalTableIDs) > 0 {
sr.insertDeleteRangeForTable(newJobID, physicalTableIDs)
if len(newPhysicalTableIDs) > 0 {
sr.insertDeleteRangeForTable(newJobID, newPhysicalTableIDs)
}
return nil
// ActionAddIndex, ActionAddPrimaryKey needs do it, because it needs to be rolled back when it's canceled.
case model.ActionAddIndex, model.ActionAddPrimaryKey:
// iff job.State = model.JobStateRollbackDone
tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug("AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
logutil.CL(lctx).Warn("AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldTableID",
zap.Int64("oldTableID", job.TableID))
return nil
}
var indexID int64
Expand All @@ -720,14 +730,22 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
return errors.Trace(err)
}

tempIdxID := tablecodec.TempIndexPrefix | indexID
var elementID int64 = 1
indexIDs := []int64{indexID}
var indexIDs []int64
if job.State == model.JobStateRollbackDone {
indexIDs = []int64{indexID, tempIdxID}
} else {
indexIDs = []int64{tempIdxID}
}

if len(partitionIDs) > 0 {
for _, oldPid := range partitionIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug("AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
logutil.CL(lctx).Warn(
"AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldPartitionID",
zap.Int64("oldPartitionID", oldPid))
continue
}

Expand All @@ -740,7 +758,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
case model.ActionDropIndex, model.ActionDropPrimaryKey:
tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
logutil.CL(lctx).Warn("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
return nil
}

Expand All @@ -759,7 +777,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
for _, oldPid := range partitionIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
logutil.CL(lctx).Warn("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
continue
}
// len(indexIDs) = 1
Expand All @@ -782,7 +800,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {

tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug("DropIndexes: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
logutil.CL(lctx).Warn("DropIndexes: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
return nil
}

Expand All @@ -791,7 +809,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
for _, oldPid := range partitionIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug("DropIndexes: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
logutil.CL(lctx).Warn("DropIndexes: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
continue
}
sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs)
Expand All @@ -811,7 +829,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
if len(indexIDs) > 0 {
tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
return nil
}

Expand All @@ -820,7 +838,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
for _, oldPid := range partitionIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
continue
}
sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs)
Expand All @@ -841,7 +859,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
if len(indexIDs) > 0 {
tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug("DropColumns: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
logutil.CL(lctx).Warn("DropColumns: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
return nil
}

Expand All @@ -850,7 +868,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
for _, oldPid := range partitionIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug("DropColumns: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
logutil.CL(lctx).Warn("DropColumns: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
continue
}
sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs)
Expand All @@ -870,7 +888,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
}
tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
return nil
}

Expand All @@ -879,7 +897,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
for _, oldPid := range partitionIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
continue
}
sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs)
Expand Down
36 changes: 30 additions & 6 deletions br/pkg/stream/rewrite_meta_rawkv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/tablecodec"
filter "github.com/pingcap/tidb/util/table-filter"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -435,8 +436,10 @@ var (
dropTable0Job = &model.Job{Type: model.ActionDropTable, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",[72,73,74],[""]]`)}
dropTable1Job = &model.Job{Type: model.ActionDropTable, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",[],[""]]`)}
dropTable0Partition1Job = &model.Job{Type: model.ActionDropTablePartition, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[73]]`)}
rollBackTable0IndexJob = &model.Job{Type: model.ActionAddIndex, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[2,false,[72,73,74]]`)}
rollBackTable1IndexJob = &model.Job{Type: model.ActionAddIndex, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[2,false,[]]`)}
rollBackTable0IndexJob = &model.Job{Type: model.ActionAddIndex, State: model.JobStateRollbackDone, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[2,false,[72,73,74]]`)}
rollBackTable1IndexJob = &model.Job{Type: model.ActionAddIndex, State: model.JobStateRollbackDone, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[2,false,[]]`)}
addTable0IndexJob = &model.Job{Type: model.ActionAddIndex, State: model.JobStateSynced, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[2,false,[72,73,74]]`)}
addTable1IndexJob = &model.Job{Type: model.ActionAddIndex, State: model.JobStateSynced, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[2,false,[]]`)}
dropTable0IndexJob = &model.Job{Type: model.ActionDropIndex, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",false,2,[72,73,74]]`)}
dropTable1IndexJob = &model.Job{Type: model.ActionDropIndex, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",false,2,[]]`)}
dropTable0IndexesJob = &model.Job{Type: model.ActionDropIndexes, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[],[],[2,3],[72,73,74]]`)}
Expand Down Expand Up @@ -581,23 +584,44 @@ func TestDeleteRangeForMDDLJob(t *testing.T) {
require.Equal(t, targs.tableIDs[0], mDDLJobPartition1NewID)

// roll back add index for table0
err = schemaReplace.deleteRange(rollBackTable0IndexJob)
err = schemaReplace.tryToGCJob(rollBackTable0IndexJob)
require.NoError(t, err)
for i := 0; i < len(mDDLJobALLNewPartitionIDSet); i++ {
iargs = <-midr.indexCh
_, exist := mDDLJobALLNewPartitionIDSet[iargs.tableID]
require.True(t, exist)
require.Equal(t, len(iargs.indexIDs), 1)
require.Equal(t, len(iargs.indexIDs), 2)
require.Equal(t, iargs.indexIDs[0], int64(2))
require.Equal(t, iargs.indexIDs[1], int64(tablecodec.TempIndexPrefix|2))
}

// roll back add index for table1
err = schemaReplace.deleteRange(rollBackTable1IndexJob)
err = schemaReplace.tryToGCJob(rollBackTable1IndexJob)
require.NoError(t, err)
iargs = <-midr.indexCh
require.Equal(t, iargs.tableID, mDDLJobTable1NewID)
require.Equal(t, len(iargs.indexIDs), 1)
require.Equal(t, len(iargs.indexIDs), 2)
require.Equal(t, iargs.indexIDs[0], int64(2))
require.Equal(t, iargs.indexIDs[1], int64(tablecodec.TempIndexPrefix|2))

// add index for table 0
err = schemaReplace.tryToGCJob(addTable0IndexJob)
require.NoError(t, err)
for i := 0; i < len(mDDLJobALLNewPartitionIDSet); i++ {
iargs = <-midr.indexCh
_, exist := mDDLJobALLNewPartitionIDSet[iargs.tableID]
require.True(t, exist)
require.Equal(t, len(iargs.indexIDs), 1)
require.Equal(t, iargs.indexIDs[0], int64(tablecodec.TempIndexPrefix|2))
}

// add index for table 1
err = schemaReplace.tryToGCJob(addTable1IndexJob)
require.NoError(t, err)
iargs = <-midr.indexCh
require.Equal(t, iargs.tableID, mDDLJobTable1NewID)
require.Equal(t, len(iargs.indexIDs), 1)
require.Equal(t, iargs.indexIDs[0], int64(tablecodec.TempIndexPrefix|2))

// drop index for table0
err = schemaReplace.deleteRange(dropTable0IndexJob)
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/stream/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ func TestDateFormat(t *testing.T) {
434605479096221697,
"2022-07-15 20:32:12.734 +0800",
},
{
434605478903808000,
"2022-07-15 20:32:12 +0800",
},
}

timeZone, _ := time.LoadLocation("Asia/Shanghai")
Expand Down
Loading

0 comments on commit 77e8bd9

Please sign in to comment.