diff --git a/pkg/ddl/ddl_api.go b/pkg/ddl/ddl_api.go index 08230de72fc7f..d0289210b6ba9 100644 --- a/pkg/ddl/ddl_api.go +++ b/pkg/ddl/ddl_api.go @@ -5334,10 +5334,10 @@ func checkTiFlashReplicaCompatible(source *model.TiFlashReplicaInfo, target *mod return true } -func checkTableDefCompatible(source *model.TableInfo, target *model.TableInfo) error { +func checkTableDefCompatible(source *model.TableInfo, target *model.TableInfo) (bool, error) { // check temp table if target.TempTableType != model.TempTableNone { - return errors.Trace(dbterror.ErrPartitionExchangeTempTable.FastGenByArgs(target.Name)) + return false, errors.Trace(dbterror.ErrPartitionExchangeTempTable.FastGenByArgs(target.Name)) } // check auto_random @@ -5348,38 +5348,42 @@ func checkTableDefCompatible(source *model.TableInfo, target *model.TableInfo) e source.ShardRowIDBits != target.ShardRowIDBits || source.MaxShardRowIDBits != target.MaxShardRowIDBits || !checkTiFlashReplicaCompatible(source.TiFlashReplica, target.TiFlashReplica) { - return errors.Trace(dbterror.ErrTablesDifferentMetadata) + return false, errors.Trace(dbterror.ErrTablesDifferentMetadata) } if len(source.Cols()) != len(target.Cols()) { - return errors.Trace(dbterror.ErrTablesDifferentMetadata) + return false, errors.Trace(dbterror.ErrTablesDifferentMetadata) } // Col compatible check for i, sourceCol := range source.Cols() { targetCol := target.Cols()[i] if sourceCol.IsVirtualGenerated() != targetCol.IsVirtualGenerated() { - return dbterror.ErrUnsupportedOnGeneratedColumn.GenWithStackByArgs("Exchanging partitions for non-generated columns") + return false, dbterror.ErrUnsupportedOnGeneratedColumn.GenWithStackByArgs("Exchanging partitions for non-generated columns") } // It should strictyle compare expressions for generated columns if sourceCol.Name.L != targetCol.Name.L || sourceCol.Hidden != targetCol.Hidden || !checkFieldTypeCompatible(&sourceCol.FieldType, &targetCol.FieldType) || sourceCol.GeneratedExprString != targetCol.GeneratedExprString { - return errors.Trace(dbterror.ErrTablesDifferentMetadata) + return false, errors.Trace(dbterror.ErrTablesDifferentMetadata) } if sourceCol.State != model.StatePublic || targetCol.State != model.StatePublic { - return errors.Trace(dbterror.ErrTablesDifferentMetadata) + return false, errors.Trace(dbterror.ErrTablesDifferentMetadata) } if sourceCol.ID != targetCol.ID { - return dbterror.ErrPartitionExchangeDifferentOption.GenWithStackByArgs(fmt.Sprintf("column: %s", sourceCol.Name)) + return false, dbterror.ErrPartitionExchangeDifferentOption.GenWithStackByArgs(fmt.Sprintf("column: %s", sourceCol.Name)) } } if len(source.Indices) != len(target.Indices) { - return errors.Trace(dbterror.ErrTablesDifferentMetadata) + return false, errors.Trace(dbterror.ErrTablesDifferentMetadata) } + globalIndex := false for _, sourceIdx := range source.Indices { + // TODO: Should we allow Exchange Partition without matching GLOBAL INDEX vs UNIQUE INDEX? + // Since then we don't need to drop the UNIQUE INDEX from the table->partition! + // But also there are bigger risk that there are duplicate rows... if sourceIdx.Global { - return dbterror.ErrPartitionExchangeDifferentOption.GenWithStackByArgs(fmt.Sprintf("global index: %s", sourceIdx.Name)) + globalIndex = true } var compatIdx *model.IndexInfo for _, targetIdx := range target.Indices { @@ -5389,31 +5393,32 @@ func checkTableDefCompatible(source *model.TableInfo, target *model.TableInfo) e } // No match index if compatIdx == nil { - return errors.Trace(dbterror.ErrTablesDifferentMetadata) + return false, errors.Trace(dbterror.ErrTablesDifferentMetadata) } // Index type is not compatible if sourceIdx.Tp != compatIdx.Tp || sourceIdx.Unique != compatIdx.Unique || sourceIdx.Primary != compatIdx.Primary { - return errors.Trace(dbterror.ErrTablesDifferentMetadata) + return false, errors.Trace(dbterror.ErrTablesDifferentMetadata) } // The index column if len(sourceIdx.Columns) != len(compatIdx.Columns) { - return errors.Trace(dbterror.ErrTablesDifferentMetadata) + return false, errors.Trace(dbterror.ErrTablesDifferentMetadata) } for i, sourceIdxCol := range sourceIdx.Columns { compatIdxCol := compatIdx.Columns[i] if sourceIdxCol.Length != compatIdxCol.Length || sourceIdxCol.Name.L != compatIdxCol.Name.L { - return errors.Trace(dbterror.ErrTablesDifferentMetadata) + return false, errors.Trace(dbterror.ErrTablesDifferentMetadata) } } if sourceIdx.ID != compatIdx.ID { - return dbterror.ErrPartitionExchangeDifferentOption.GenWithStackByArgs(fmt.Sprintf("index: %s", sourceIdx.Name)) + // TODO: We could allow Global Index ID mismatches, since they will be rewritten any way. + return false, dbterror.ErrPartitionExchangeDifferentOption.GenWithStackByArgs(fmt.Sprintf("index: %s", sourceIdx.Name)) } } - return nil + return globalIndex, nil } func checkExchangePartition(pt *model.TableInfo, nt *model.TableInfo) error { @@ -5471,7 +5476,7 @@ func (d *ddl) ExchangeTablePartition(ctx sessionctx.Context, ident ast.Ident, sp return errors.Trace(err) } - err = checkTableDefCompatible(ptMeta, ntMeta) + _, err = checkTableDefCompatible(ptMeta, ntMeta) if err != nil { return errors.Trace(err) } @@ -5490,7 +5495,8 @@ func (d *ddl) ExchangeTablePartition(ctx sessionctx.Context, ident ast.Ident, sp {Database: ptSchema.Name.L, Table: ptMeta.Name.L}, {Database: ntSchema.Name.L, Table: ntMeta.Name.L}, }, - SQLMode: ctx.GetSessionVars().SQLMode, + SQLMode: ctx.GetSessionVars().SQLMode, + ReorgMeta: NewDDLReorgMeta(ctx), } err = d.DoDDLJob(ctx, job) diff --git a/pkg/ddl/partition.go b/pkg/ddl/partition.go index 55ef0d59b624c..3d108021e0184 100644 --- a/pkg/ddl/partition.go +++ b/pkg/ddl/partition.go @@ -36,6 +36,7 @@ import ( "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta" + "github.com/pingcap/tidb/pkg/meta/autoid" "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/parser" "github.com/pingcap/tidb/pkg/parser/ast" @@ -2116,7 +2117,7 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) ( } defer w.sessPool.Put(sctx) rh := newReorgHandler(sess.NewSession(sctx)) - reorgInfo, err := getReorgInfoFromPartitions(d.jobContext(job.ID, job.ReorgMeta), d, rh, job, dbInfo, pt, physicalTableIDs, elements) + reorgInfo, err := getReorgInfoFromPartitions(d.jobContext(job.ID, job.ReorgMeta), d, rh, job, dbInfo, pt, physicalTableIDs[0], elements) if err != nil || reorgInfo.first { // If we run reorg firstly, we should update the job snapshot version @@ -2314,7 +2315,7 @@ func (w *worker) onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo } defer w.sessPool.Put(sctx) rh := newReorgHandler(sess.NewSession(sctx)) - reorgInfo, err := getReorgInfoFromPartitions(d.jobContext(job.ID, job.ReorgMeta), d, rh, job, dbInfo, pt, physicalTableIDs, elements) + reorgInfo, err := getReorgInfoFromPartitions(d.jobContext(job.ID, job.ReorgMeta), d, rh, job, dbInfo, pt, physicalTableIDs[0], elements) if err != nil || reorgInfo.first { // If we run reorg firstly, we should update the job snapshot version @@ -2542,7 +2543,8 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, errors.Trace(err) } - err = checkTableDefCompatible(pt, nt) + var globalIndex bool + globalIndex, err = checkTableDefCompatible(pt, nt) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) @@ -2564,8 +2566,9 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, errors.Trace(err) } } + // TODO: If not Global Index and WITHOUT VALIDATION, just do a quick swap! var ptInfo []schemaIDAndTableInfo - if len(nt.Constraints) > 0 { + if len(nt.Constraints) > 0 || globalIndex { pt.ExchangePartitionInfo = &model.ExchangePartitionInfo{ ExchangePartitionTableID: nt.ID, ExchangePartitionDefID: defID, @@ -2579,11 +2582,59 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo ExchangePartitionTableID: ptID, ExchangePartitionDefID: defID, } + if globalIndex { + // How to handle new Unique index for the partition->table? + // - Somehow have a flag to say that a specific partition should have the extra index? + // How to handle the current Global Index? + // - Add the table->partition entries + // - Somehow have a flag to say that the table also needs to update the global index? + // How to handle duplicate entries? + // - if during reorganize -> fail and rollback + // - if user DML -> OK to return duplicate error? (hard to fail and rollback?) + var ptUniqueIndexes []*model.IndexInfo + var ntGlobalIndexes []*model.IndexInfo + for _, idx := range pt.Indices { + if idx.Global { + ptUnique := idx.Clone() + ptUnique.Global = false + ptUnique.State = model.StateDeleteOnly + ptUniqueIndexes = append(ptUniqueIndexes, ptUnique) + // TODO: + // How to flag that it should only be used for a single partition? + found := false + for _, ntIdx := range nt.Indices { + // For now, the IDs must match! + if ntIdx.ID == idx.ID { + ntGlobal := idx.Clone() + // TODO: how will this work?!? + // It should only Delete entries from the same table/partition ID + // otherwise it should ignore them! + ntGlobal.State = model.StateDeleteOnly + ntGlobalIndexes = append(ntGlobalIndexes, ntGlobal) + found = true + } + } + if !found { + job.State = model.JobStateCancelled + return ver, errors.Trace(dbterror.ErrTablesDifferentMetadata) + } + } + } + pt.Indices = append(pt.Indices, ptUniqueIndexes...) + nt.Indices = append(nt.Indices, ntGlobalIndexes...) + //pt.Partition.DroppingDefinitions = []model.PartitionDefinition{partDef.Clone()} + // Also set AddingDefinitions, to block out new global index entries until public! + // Should give duplicate error still. + addingDef := partDef.Clone() + addingDef.ID = nt.ID + pt.Partition.AddingDefinitions = []model.PartitionDefinition{addingDef} + } + // We need an interim schema version, // so there are no non-matching rows inserted // into the table using the schema version // before the exchange is made. - job.SchemaState = model.StateWriteOnly + job.SchemaState = model.StateDeleteOnly return updateVersionAndTableInfoWithCheck(d, t, job, nt, true, ptInfo...) } // From now on, nt (the non-partitioned table) has @@ -2607,23 +2658,158 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, errors.Trace(err) } } + globalIndex := hasGlobalIndex(pt) - if withValidation { + switch job.SchemaState { + case model.StateDeleteOnly: + // Just move to StateWriteOnly, to be sure new conditions and indexes are used in all sessions + if globalIndex { + idxBalance := 0 + for _, idx := range pt.Indices { + if idx.State == model.StateDeleteOnly { + idx.State = model.StateWriteOnly + idxBalance++ + } + } + for _, idx := range nt.Indices { + if idx.State == model.StateDeleteOnly { + idx.State = model.StateWriteOnly + idxBalance-- + } + } + if idxBalance != 0 { + job.State = model.JobStateRollingback + return ver, errors.Trace(dbterror.ErrTablesDifferentMetadata) + } + } + job.SchemaState = model.StateWriteOnly + return updateVersionAndTableInfoWithCheck(d, t, job, nt, true, schemaIDAndTableInfo{ + schemaID: ptSchemaID, + tblInfo: pt, + }) + case model.StateWriteOnly: + if withValidation { + ntbl, err := getTable((*asAutoIDRequirement)(d), job.SchemaID, nt) + if err != nil { + return ver, errors.Trace(err) + } + ptbl, err := getTable((*asAutoIDRequirement)(d), ptSchemaID, pt) + if err != nil { + return ver, errors.Trace(err) + } + err = checkExchangePartitionRecordValidation(w, ptbl, ntbl, ptDbInfo.Name.L, ntDbInfo.Name.L, partName) + if err != nil { + job.State = model.JobStateRollingback + return ver, errors.Trace(err) + } + } + + if globalIndex { + idxBalance := 0 + for _, idx := range pt.Indices { + if idx.State == model.StateWriteOnly { + idx.State = model.StateWriteReorganization + idxBalance++ + } + } + for _, idx := range nt.Indices { + if idx.State == model.StateWriteOnly { + idx.State = model.StateWriteReorganization + idxBalance-- + } + } + if idxBalance != 0 { + job.State = model.JobStateRollingback + return ver, errors.Trace(dbterror.ErrTablesDifferentMetadata) + } + job.SchemaState = model.StateWriteReorganization + return updateVersionAndTableInfoWithCheck(d, t, job, nt, true, schemaIDAndTableInfo{ + schemaID: ptSchemaID, + tblInfo: pt, + }) + } + // else Done, complete the exchange! + case model.StateWriteReorganization: + // TODO: + // Fill in the global index with the table->partition entries + // Fill in the new unique index with the partition->table entries ntbl, err := getTable((*asAutoIDRequirement)(d), job.SchemaID, nt) if err != nil { + job.State = model.JobStateRollingback return ver, errors.Trace(err) } + nPhysTbl, ok := ntbl.(table.PhysicalTable) + if !ok { + job.State = model.JobStateRollingback + return ver, errors.Trace(dbterror.ErrTablesDifferentMetadata) + } ptbl, err := getTable((*asAutoIDRequirement)(d), ptSchemaID, pt) if err != nil { + job.State = model.JobStateRollingback return ver, errors.Trace(err) } - err = checkExchangePartitionRecordValidation(w, ptbl, ntbl, ptDbInfo.Name.L, ntDbInfo.Name.L, partName) + partTbl, ok := ptbl.(table.PartitionedTable) + if !ok { + job.State = model.JobStateRollingback + return ver, errors.Trace(dbterror.ErrTablesDifferentMetadata) + } + var done bool + done, ver, err = doPartitionExchangeIndexReorgWork(w, d, t, job, partTbl, nPhysTbl) + if !done { + return ver, err + } if err != nil { job.State = model.JobStateRollingback - return ver, errors.Trace(err) + return ver, errors.Trace(dbterror.ErrTablesDifferentMetadata) + } + idxBalance := 0 + for _, idx := range pt.Indices { + if idx.State == model.StateWriteReorganization { + idx.State = model.StateDeleteReorganization + idxBalance++ + } + } + for _, idx := range nt.Indices { + if idx.State == model.StateWriteReorganization { + idx.State = model.StateDeleteReorganization + idxBalance-- + } + } + if idxBalance != 0 { + job.State = model.JobStateRollingback + return ver, errors.Trace(dbterror.ErrTablesDifferentMetadata) } + job.SchemaState = model.StateDeleteReorganization + return updateVersionAndTableInfoWithCheck(d, t, job, nt, true, schemaIDAndTableInfo{ + schemaID: ptSchemaID, + tblInfo: pt, + }) + case model.StateDeleteReorganization: + newIdxs := make([]*model.IndexInfo, 0) + idxBalance := 0 + for _, idx := range pt.Indices { + if idx.State == model.StateDeleteReorganization { + idxBalance++ + } else { + newIdxs = append(newIdxs, idx) + } + } + pt.Indices = newIdxs + newIdxs = make([]*model.IndexInfo, 0) + for _, idx := range nt.Indices { + if idx.State == model.StateDeleteReorganization { + idxBalance-- + } else { + newIdxs = append(newIdxs, idx) + } + } + nt.Indices = newIdxs + if idxBalance != 0 { + job.State = model.JobStateRollingback + return ver, errors.Trace(dbterror.ErrTablesDifferentMetadata) + } + pt.Partition.AddingDefinitions = nil } - // partition table auto IDs. ptAutoIDs, err := t.GetAutoIDAccessors(ptSchemaID, ptID).Get() if err != nil { @@ -2755,7 +2941,10 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo job.SchemaState = model.StatePublic nt.ExchangePartitionInfo = nil - ver, err = updateVersionAndTableInfoWithCheck(d, t, job, nt, true) + ver, err = updateVersionAndTableInfoWithCheck(d, t, job, nt, true, schemaIDAndTableInfo{ + schemaID: ptSchemaID, + tblInfo: pt, + }) if err != nil { return ver, errors.Trace(err) } @@ -3315,6 +3504,66 @@ func newStatsDDLEventForJob( return event, nil } +// doPartitionExchangeIndexReorgWork add the exchanged tables entries +// to the global index and creates the missing unique index on the old partition +func doPartitionExchangeIndexReorgWork(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, pt table.PartitionedTable, nt table.PhysicalTable) (done bool, ver int64, err error) { + job.ReorgMeta.ReorgTp = model.ReorgTypeTxn + sctx, err1 := w.sessPool.Get() + if err1 != nil { + return done, ver, err1 + } + defer w.sessPool.Put(sctx) + rh := newReorgHandler(sess.NewSession(sctx)) + // update the global indexes with entries from table->partition (nt) + elements := make([]*meta.Element, 0) + for _, index := range pt.Meta().Indices { + if index.Global { + elements = append(elements, &meta.Element{ID: index.ID, TypeKey: meta.IndexElementKey}) + } + } + dbInfo, err := t.GetDatabase(job.SchemaID) + if err != nil { + return false, ver, errors.Trace(err) + } + reorgInfo, err := getReorgInfo(d.jobContext(job.ID, job.ReorgMeta), d, rh, job, dbInfo, nt, elements, false) + if err != nil { + return false, ver, errors.Trace(err) + } + + // create a new unique index for partition->table + // How to use reorgInfo for this? I.e. how can we resume/continue on errors/timeout + // between Adding to Global Index and creating the unique local index? + err = w.runReorgJob(reorgInfo, nt.Meta(), d.lease, func() (reorgErr error) { + defer tidbutil.Recover(metrics.LabelDDL, "doPartitionReorgWork", + func() { + reorgErr = dbterror.ErrCancelledDDLJob.GenWithStack("reorganize partition for table `%v` panic", nt.Meta().Name) + }, false) + return w.reorgExchangeIndexes(d, pt, nt, reorgInfo) + }) + if err != nil { + if dbterror.ErrPausedDDLJob.Equal(err) { + return false, ver, nil + } + + if dbterror.ErrWaitReorgTimeout.Equal(err) { + // If timeout, we should return, check for the owner and re-wait job done. + return false, ver, nil + } + if kv.IsTxnRetryableError(err) { + return false, ver, errors.Trace(err) + } + if err1 := rh.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil { + logutil.DDLLogger().Warn("reorg partition job failed, RemoveDDLReorgHandle failed, can't convert job to rollback", + zap.Stringer("job", job), zap.Error(err1)) + } + logutil.DDLLogger().Warn("reorg partition job failed, convert job to rollback", zap.Stringer("job", job), zap.Error(err)) + // TODO: rollback new global indexes! TODO: How to handle new index ids? + ver, err = convertAddTablePartitionJob2RollbackJob(d, t, job, err, pt.Meta()) + return false, ver, errors.Trace(err) + } + return true, ver, err +} + func doPartitionReorgWork(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, physTblIDs []int64) (done bool, ver int64, err error) { job.ReorgMeta.ReorgTp = model.ReorgTypeTxn sctx, err1 := w.sessPool.Get() @@ -3340,7 +3589,8 @@ func doPartitionReorgWork(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tb if err != nil { return false, ver, errors.Trace(err) } - reorgInfo, err := getReorgInfoFromPartitions(d.jobContext(job.ID, job.ReorgMeta), d, rh, job, dbInfo, partTbl, physTblIDs, elements) + // TODO: just use firstPartID instead of a slice. + reorgInfo, err := getReorgInfoFromPartitions(d.jobContext(job.ID, job.ReorgMeta), d, rh, job, dbInfo, partTbl, physTblIDs[0], elements) err = w.runReorgJob(reorgInfo, tbl.Meta(), d.lease, func() (reorgErr error) { defer tidbutil.Recover(metrics.LabelDDL, "doPartitionReorgWork", func() { @@ -3584,6 +3834,83 @@ func (w *reorgPartitionWorker) GetCtx() *backfillCtx { return w.backfillCtx } +func (w *worker) reorgExchangeIndexes(d *ddlCtx, pt table.PartitionedTable, nt table.PhysicalTable, reorgInfo *reorgInfo) (err error) { + // Now add to all the global indexes from the old table + + if reorgInfo.PhysicalTableID == nt.Meta().ID { + err = w.addTableIndex(pt, reorgInfo) + if err != nil { + return errors.Trace(err) + } + createUniqueIndexes := false + for _, indexInfo := range nt.Meta().Indices { + if indexInfo.Global && indexInfo.State == model.StateWriteReorganization { + createUniqueIndexes = true + break + } + } + if createUniqueIndexes { + reorgInfo.PhysicalTableID = pt.Meta().ExchangePartitionInfo.ExchangePartitionDefID + } + // Currently the indexes must have the same IDs, so OK to reuse! + reorgInfo.currElement = reorgInfo.elements[0] + // Get the original start handle and end handle. + currentVer, err := getValidCurrentVersion(reorgInfo.d.store) + if err != nil { + return errors.Trace(err) + } + physTbl := pt.GetPartition(pt.Meta().ExchangePartitionInfo.ExchangePartitionDefID) + startHandle, endHandle, err := getTableRange(reorgInfo.NewJobContext(), reorgInfo.d, physTbl, currentVer.Ver, reorgInfo.Job.Priority) + if err != nil { + return errors.Trace(err) + } + + // Always (re)start with the full PhysicalTable range + reorgInfo.StartKey, reorgInfo.EndKey = startHandle, endHandle + + // Write the reorg info to store so the whole reorganize process can recover from panic. + err = reorgInfo.UpdateReorgMeta(reorgInfo.StartKey, w.sessPool) + logutil.DDLLogger().Info("update column and indexes", + zap.Int64("jobID", reorgInfo.Job.ID), + zap.ByteString("elementType", reorgInfo.currElement.TypeKey), + zap.Int64("elementID", reorgInfo.currElement.ID), + zap.Int64("partitionTableId", physTbl.GetPhysicalID()), + zap.String("startHandle", hex.EncodeToString(reorgInfo.StartKey)), + zap.String("endHandle", hex.EncodeToString(reorgInfo.EndKey))) + if err != nil { + return errors.Trace(err) + } + } + if reorgInfo.PhysicalTableID == nt.Meta().ID { + return nil + } + // We want to use a table.PhysicalTable that looks like the new table, + // I.e. it should have both TableID and PhysicalID set to the exchanged partition ID. + // And the indexes should be set correctly, with the same IDs as both the partitioned table and + // the exchanged table. + tblInfo := pt.Meta().Clone() + tblInfo.Partition = nil + tblInfo.ID = pt.Meta().ExchangePartitionInfo.ExchangePartitionDefID + newIdxs := make([]*model.IndexInfo, 0) +IDX: + for _, idx := range tblInfo.Indices { + for _, element := range reorgInfo.elements { + if bytes.Equal(element.TypeKey, meta.IndexElementKey) { + if element.ID == idx.ID && idx.State == model.StateWriteReorganization { + newIdxs = append(newIdxs, idx.Clone()) + continue IDX + } + } + } + } + tblInfo.Indices = newIdxs + allocs := autoid.NewAllocatorsFromTblInfo((*asAutoIDRequirement)(d), reorgInfo.Job.SchemaID, nt.Meta()) + physTbl, err := table.TableFromMeta(allocs, tblInfo) + // Make sure to use the correct index and table, to populate the LOCAL UNIQUE partition index, + // which will be the new table's UNIQUE (possible PK) index. + return w.addTableIndex(physTbl, reorgInfo) +} + func (w *worker) reorgPartitionDataAndIndex(t table.Table, reorgInfo *reorgInfo) (err error) { // First copy all table data to the new AddingDefinitions partitions // from each of the DroppingDefinitions partitions. diff --git a/pkg/ddl/reorg.go b/pkg/ddl/reorg.go index b31299e161d39..7aa6c5f989262 100644 --- a/pkg/ddl/reorg.go +++ b/pkg/ddl/reorg.go @@ -483,6 +483,8 @@ type reorgInfo struct { // DDL reorganize for a partitioned table will handle partitions one by one, // PhysicalTableID is used to trace the current partition we are handling. // If the table is not partitioned, PhysicalTableID would be TableID. + // For Exchange Partition, it will point either to the exchanged table + // or the partition, depending on state in reorg. PhysicalTableID int64 dbInfo *model.DBInfo elements []*meta.Element @@ -803,7 +805,7 @@ func getReorgInfo(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job, return &info, nil } -func getReorgInfoFromPartitions(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job, dbInfo *model.DBInfo, tbl table.PartitionedTable, partitionIDs []int64, elements []*meta.Element) (*reorgInfo, error) { +func getReorgInfoFromPartitions(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job, dbInfo *model.DBInfo, tbl table.PartitionedTable, firstPartID int64, elements []*meta.Element) (*reorgInfo, error) { var ( element *meta.Element start kv.Key @@ -820,7 +822,7 @@ func getReorgInfoFromPartitions(ctx *JobContext, d *ddlCtx, rh *reorgHandler, jo if err != nil { return nil, errors.Trace(err) } - pid = partitionIDs[0] + pid = firstPartID physTbl := tbl.GetPartition(pid) start, end, err = getTableRange(ctx, d, physTbl, ver.Ver, job.Priority) diff --git a/pkg/ddl/tests/partition/db_partition_test.go b/pkg/ddl/tests/partition/db_partition_test.go index 68430c74e9c14..509652374acb6 100644 --- a/pkg/ddl/tests/partition/db_partition_test.go +++ b/pkg/ddl/tests/partition/db_partition_test.go @@ -51,6 +51,7 @@ import ( "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/codec" "github.com/pingcap/tidb/pkg/util/dbterror" + "github.com/pingcap/tidb/pkg/util/keydecoder" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -3764,3 +3765,84 @@ func checkGlobalAndPK(t *testing.T, tk *testkit.TestKit, name string, indexes in require.True(t, idxInfo.Primary) } } + +func TestExchangePartitionGlobalIndex(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set tidb_enable_global_index=ON") + defer func() { + tk.MustExec("set tidb_enable_global_index=default") + }() + + tk.MustExec(`create table t (a int, b int, primary key (a) nonclustered) partition by hash (b) partitions 3`) + tk.MustExec(`create table t2 (a int, b int, primary key (a) nonclustered)`) + tk.MustExec(`insert into t values (0,0),(1,1),(2,2),(3,3),(4,4),(5,5)`) + tk.MustExec(`insert into t2 values (6,6),(9,9)`) + tOrigin := external.GetTableByName(t, tk, "test", "t").Meta() + t2Origin := external.GetTableByName(t, tk, "test", "t2").Meta() + tk.MustExec(`alter table t exchange partition p0 with table t2 without validation`) + tk.MustQuery(`select * from t where a = 6`).Check(testkit.Rows("6 6")) + tk.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1", "2 2", "4 4", "5 5", "6 6", "9 9")) + tk.MustQuery(`select * from t2 where a = 3`).Check(testkit.Rows("3 3")) + tk.MustQuery(`select * from t2`).Check(testkit.Rows("0 0", "3 3")) + tNew := external.GetTableByName(t, tk, "test", "t").Meta() + require.NotEqual(t, tOrigin.UpdateTS, tNew.UpdateTS) + tOrigin.UpdateTS = tNew.UpdateTS + require.Less(t, tOrigin.Revision, tNew.Revision) + tOrigin.Revision = tNew.Revision + require.Less(t, tOrigin.Partition.Definitions[0].ID, tNew.Partition.Definitions[0].ID) + tOrigin.Partition.Definitions[0].ID = tNew.Partition.Definitions[0].ID + require.Equal(t, tOrigin, tNew) + t2New := external.GetTableByName(t, tk, "test", "t2").Meta() + require.NotEqual(t, t2Origin.UpdateTS, t2New.UpdateTS) + t2Origin.UpdateTS = t2New.UpdateTS + require.Less(t, t2Origin.Revision, t2New.Revision) + t2Origin.Revision = t2New.Revision + require.Greater(t, t2Origin.ID, t2New.ID) + t2Origin.ID = t2New.ID + require.Equal(t, t2Origin, t2New) + // TODO: Check that the index is cleaned up! + //tk.MustExec(`insert into t values (0,0)`) + tk.MustExec(`begin`) + // TODO: Do we need to trigger GC? IIRC we use UniStore, which does it directly? + /* + res := getAllKVsForTableID(t, tk.Session(), tNew.ID) + globalIndexEntries := []string{"0", "1", "2", "4", "5", "6", "9"} + for i, entry := range res { + if i < len(globalIndexEntries) { + require.Equal(t, globalIndexEntries[i], entry.IndexValues[0]) + require.Equal(t, tNew.Indices[0].ID, entry.IndexID) + } + } + tk.MustExec(`alter table t exchange partition p0 with table t2 with validation`) + tk.MustQuery(`select * from t where a = 3`).Check(testkit.Rows("3 3")) + tk.MustQuery(`select * from t2 where a = 6`).Check(testkit.Rows("6 6")) + */ + // TODO: Check all valid entries in both table and indexes! +} + +// func DecodeKey(key []byte, is infoschema.InfoSchema) (DecodedKey, error) { +func getAllKVsForTableID(t *testing.T, ctx sessionctx.Context, tblID int64) []keydecoder.DecodedKey { + is := domain.GetDomain(ctx).InfoSchema() + txn, err := ctx.Txn(true) + require.NoError(t, err) + defer func() { + err := txn.Rollback() + require.NoError(t, err) + }() + + prefix := tablecodec.EncodeTablePrefix(tblID) + end := tablecodec.EncodeTablePrefix(tblID + 1) + it, err := txn.Iter(prefix, end) + res := make([]keydecoder.DecodedKey, 0) + require.NoError(t, err) + for it.Valid() { + key, err := keydecoder.DecodeKey(it.Key(), is) + require.NoError(t, err) + res = append(res, key) + err = it.Next() + require.NoError(t, err) + } + return res +} diff --git a/tests/integrationtest/r/globalindex/ddl.result b/tests/integrationtest/r/globalindex/ddl.result new file mode 100644 index 0000000000000..de9293e1d9281 --- /dev/null +++ b/tests/integrationtest/r/globalindex/ddl.result @@ -0,0 +1,15 @@ +set tidb_enable_global_index = 1; +create table t (a int, b int, primary key (a) nonclustered) partition by hash (b) partitions 3; +create table t2 (a int, b int, primary key (a) nonclustered); +alter table t exchange partition p2 with table t2; +drop table t, t2; +create table t (a int, b int, unique key (a)) partition by hash (b) partitions 3; +create table t2 (a int, b int, unique key (a)); +alter table t exchange partition p2 with table t2; +drop table t, t2; +create table t (a varchar(255), b varchar(255), primary key (a,b) clustered, unique key (a)) partition by key (b) partitions 3; +create table t2 (a varchar(255), b varchar(255), primary key (a,b) clustered, unique key (a)); +alter table t exchange partition p2 with table t2; +Error 8200 (HY000): Unsupported partition type of table t when exchanging partition +drop table t, t2; +set tidb_enable_global_index = default; diff --git a/tests/integrationtest/t/globalindex/ddl.test b/tests/integrationtest/t/globalindex/ddl.test new file mode 100644 index 0000000000000..6220f4f345346 --- /dev/null +++ b/tests/integrationtest/t/globalindex/ddl.test @@ -0,0 +1,16 @@ +# Not supporting EXCHANGE PARTITION with GLOBAL INDEX +set tidb_enable_global_index = 1; +create table t (a int, b int, primary key (a) nonclustered) partition by hash (b) partitions 3; +create table t2 (a int, b int, primary key (a) nonclustered); +alter table t exchange partition p2 with table t2; +drop table t, t2; +create table t (a int, b int, unique key (a)) partition by hash (b) partitions 3; +create table t2 (a int, b int, unique key (a)); +alter table t exchange partition p2 with table t2; +drop table t, t2; +create table t (a varchar(255), b varchar(255), primary key (a,b) clustered, unique key (a)) partition by key (b) partitions 3; +create table t2 (a varchar(255), b varchar(255), primary key (a,b) clustered, unique key (a)); +--error 8200 +alter table t exchange partition p2 with table t2; +drop table t, t2; +set tidb_enable_global_index = default;