Skip to content

Commit

Permalink
This is an automated cherry-pick of #56082
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
mjonss authored and ti-chi-bot committed Dec 10, 2024
1 parent b298e21 commit 44c00a9
Show file tree
Hide file tree
Showing 17 changed files with 9,292 additions and 122 deletions.
6,776 changes: 6,776 additions & 0 deletions pkg/ddl/executor.go

Large diffs are not rendered by default.

172 changes: 156 additions & 16 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ func checkAddPartition(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.P
func (w *worker) onAddTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
// Handle the rolling back job
if job.IsRollingback() {
<<<<<<< HEAD
ver, err := w.onDropTablePartition(d, t, job)
=======
ver, err := w.rollbackLikeDropPartition(jobCtx, job)
>>>>>>> 1e24d396082 (*: Drop partition DDL handling for overlapping partitions during State Changes (#56082))
if err != nil {
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -341,20 +345,6 @@ func rollbackAddingPartitionInfo(tblInfo *model.TableInfo) ([]int64, []string, [
return physicalTableIDs, partNames, rollbackBundles
}

// Check if current table already contains DEFAULT list partition
func checkAddListPartitions(tblInfo *model.TableInfo) error {
for i := range tblInfo.Partition.Definitions {
for j := range tblInfo.Partition.Definitions[i].InValues {
for _, val := range tblInfo.Partition.Definitions[i].InValues[j] {
if val == "DEFAULT" { // should already be normalized
return dbterror.ErrGeneralUnsupportedDDL.GenWithStackByArgs("ADD List partition, already contains DEFAULT partition. Please use REORGANIZE PARTITION instead")
}
}
}
}
return nil
}

// checkAddPartitionValue check add Partition Values,
// For Range: values less than value must be strictly increasing for each partition.
// For List: if a Default partition exists,
Expand Down Expand Up @@ -394,10 +384,16 @@ func checkAddPartitionValue(meta *model.TableInfo, part *model.PartitionInfo) er
currentRangeValue = nextRangeValue
}
}
<<<<<<< HEAD
case model.PartitionTypeList:
err := checkAddListPartitions(meta)
if err != nil {
return err
=======
case pmodel.PartitionTypeList:
if meta.Partition.GetDefaultListPartition() != -1 {
return dbterror.ErrGeneralUnsupportedDDL.GenWithStackByArgs("ADD List partition, already contains DEFAULT partition. Please use REORGANIZE PARTITION instead")
>>>>>>> 1e24d396082 (*: Drop partition DDL handling for overlapping partitions during State Changes (#56082))
}
}
return nil
Expand Down Expand Up @@ -1951,7 +1947,79 @@ func dropLabelRules(_ *ddlCtx, schemaName, tableName string, partNames []string)
return infosync.UpdateLabelRules(context.TODO(), patch)
}

// rollbackLikeDropPartition does rollback for Reorganize partition and Add partition.
// It will drop newly created partitions that has not yet been used, including cleaning
// up label rules and bundles as well as changed indexes due to global flag.
func (w *worker) rollbackLikeDropPartition(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
args, err := model.GetTablePartitionArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
partInfo := args.PartInfo
metaMut := jobCtx.metaMut
tblInfo, err := GetTableInfoAndCancelFaultJob(metaMut, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}
tblInfo.Partition.DroppingDefinitions = nil
physicalTableIDs, pNames, rollbackBundles := rollbackAddingPartitionInfo(tblInfo)
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), rollbackBundles)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}
// TODO: Will this drop LabelRules for existing partitions, if the new partitions have the same name?
err = dropLabelRules(w.ctx, job.SchemaName, tblInfo.Name.L, pNames)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the label rules")
}

if _, err := alterTableLabelRule(job.SchemaName, tblInfo, getIDs([]*model.TableInfo{tblInfo})); err != nil {
job.State = model.JobStateCancelled
return ver, err
}
if partInfo.Type != pmodel.PartitionTypeNone {
// ALTER TABLE ... PARTITION BY
// Also remove anything with the new table id
physicalTableIDs = append(physicalTableIDs, partInfo.NewTableID)
// Reset if it was normal table before
if tblInfo.Partition.Type == pmodel.PartitionTypeNone ||
tblInfo.Partition.DDLType == pmodel.PartitionTypeNone {
tblInfo.Partition = nil
}
}

var dropIndices []*model.IndexInfo
for _, indexInfo := range tblInfo.Indices {
if indexInfo.Unique &&
indexInfo.State == model.StateDeleteReorganization &&
tblInfo.Partition.DDLState == model.StateDeleteReorganization {
dropIndices = append(dropIndices, indexInfo)
}
}
for _, indexInfo := range dropIndices {
DropIndexColumnFlag(tblInfo, indexInfo)
RemoveDependentHiddenColumns(tblInfo, indexInfo)
removeIndexInfo(tblInfo, indexInfo)
}
if tblInfo.Partition != nil {
tblInfo.Partition.ClearReorgIntermediateInfo()
}

ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo)
args.OldPhysicalTblIDs = physicalTableIDs
job.FillFinishedArgs(args)
return ver, nil
}

// onDropTablePartition deletes old partition meta.
<<<<<<< HEAD
func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
var partNames []string
partInfo := model.PartitionInfo{}
Expand Down Expand Up @@ -2009,22 +2077,80 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
job.Args = []interface{}{physicalTableIDs}
return ver, nil
}
=======
// States:
// StateNone
//
// Old partitions are queued to be deleted (delete_range), global index up-to-date
//
// StateDeleteReorganization
//
// Old partitions are not accessible/used by any sessions.
// Inserts/updates of global index which still have entries pointing to old partitions
// will overwrite those entries
// In the background we are reading all old partitions and deleting their entries from
// the global indexes.
//
// StateDeleteOnly
//
// old partitions are no longer visible, but if there is inserts/updates to the global indexes,
// duplicate key errors will be given, even if the entries are from dropped partitions
// Note that overlapping ranges (i.e. a dropped partitions with 'less than (N)' will now .. ?!?
//
// StateWriteOnly
//
// old partitions are blocked for read and write. But for read we are allowing
// "overlapping" partition to be read instead. Which means that write can only
// happen in the 'overlapping' partitions original range, not into the extended
// range open by the dropped partitions.
//
// StatePublic
//
// Original state, unaware of DDL
func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
args, err := model.GetTablePartitionArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
partNames := args.PartNames
metaMut := jobCtx.metaMut
tblInfo, err := GetTableInfoAndCancelFaultJob(metaMut, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}
>>>>>>> 1e24d396082 (*: Drop partition DDL handling for overlapping partitions during State Changes (#56082))

var physicalTableIDs []int64
// In order to skip maintaining the state check in partitionDefinition, TiDB use droppingDefinition instead of state field.
// So here using `job.SchemaState` to judge what the stage of this job is.
originalState := job.SchemaState
switch job.SchemaState {
case model.StatePublic:
// If an error occurs, it returns that it cannot delete all partitions or that the partition doesn't exist.
// Here we mark the partitions to be dropped, so they are not read or written
err = CheckDropTablePartition(tblInfo, partNames)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
// Reason, see https://github.com/pingcap/tidb/issues/55888
// Only mark the partitions as to be dropped, so they are not used, but not yet removed.
originalDefs := tblInfo.Partition.Definitions
physicalTableIDs = updateDroppingPartitionInfo(tblInfo, partNames)
tblInfo.Partition.Definitions = originalDefs
tblInfo.Partition.DDLState = model.StateWriteOnly
tblInfo.Partition.DDLAction = model.ActionDropTablePartition

job.SchemaState = model.StateWriteOnly
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != job.SchemaState)
case model.StateWriteOnly:
// Since the previous state do not use the dropping partitions,
// we can now actually remove them, allowing to write into the overlapping range
// of the higher range partition or LIST default partition.
physicalTableIDs = updateDroppingPartitionInfo(tblInfo, partNames)
err = dropLabelRules(d, job.SchemaName, tblInfo.Name.L, partNames)
if err != nil {
// TODO: Add failpoint error/cancel injection and test failure/rollback and cancellation!
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the label rules")
}
Expand Down Expand Up @@ -2060,12 +2186,14 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
return ver, err
}

tblInfo.Partition.DDLState = model.StateDeleteOnly
job.SchemaState = model.StateDeleteOnly
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != job.SchemaState)
case model.StateDeleteOnly:
// This state is not a real 'DeleteOnly' state, because tidb does not maintaining the state check in partitionDefinition.
// This state is not a real 'DeleteOnly' state, because tidb does not maintain the state check in partitionDefinition.
// Insert this state to confirm all servers can not see the old partitions when reorg is running,
// so that no new data will be inserted into old partitions when reorganizing.
tblInfo.Partition.DDLState = model.StateDeleteReorganization
job.SchemaState = model.StateDeleteReorganization
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != job.SchemaState)
case model.StateDeleteReorganization:
Expand Down Expand Up @@ -2125,6 +2253,8 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
}
droppedDefs := tblInfo.Partition.DroppingDefinitions
tblInfo.Partition.DroppingDefinitions = nil
tblInfo.Partition.DDLState = model.StateNone
tblInfo.Partition.DDLAction = model.ActionNone
// used by ApplyDiff in updateSchemaVersion
job.CtxVars = []interface{}{physicalTableIDs}
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, true)
Expand Down Expand Up @@ -2245,6 +2375,7 @@ func (w *worker) onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo
pi.DroppingDefinitions = truncatingDefinitions
pi.NewPartitionIDs = newIDs[:]

tblInfo.Partition.DDLAction = model.ActionTruncateTablePartition
job.SchemaState = model.StateDeleteOnly
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, true)
case model.StateDeleteOnly:
Expand Down Expand Up @@ -2763,7 +2894,11 @@ func getReorgPartitionInfo(t *meta.Meta, job *model.Job) (*model.TableInfo, []st
func (w *worker) onReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
// Handle the rolling back job
if job.IsRollingback() {
<<<<<<< HEAD
ver, err := w.onDropTablePartition(d, t, job)
=======
ver, err := w.rollbackLikeDropPartition(jobCtx, job)
>>>>>>> 1e24d396082 (*: Drop partition DDL handling for overlapping partitions during State Changes (#56082))
if err != nil {
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -2882,7 +3017,12 @@ func (w *worker) onReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job)
metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String()).Set(0.1 / float64(math.MaxUint64))
job.SchemaState = model.StateDeleteOnly
tblInfo.Partition.DDLState = model.StateDeleteOnly
<<<<<<< HEAD
ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, true)
=======
tblInfo.Partition.DDLAction = job.Type
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
>>>>>>> 1e24d396082 (*: Drop partition DDL handling for overlapping partitions during State Changes (#56082))
if err != nil {
return ver, errors.Trace(err)
}
Expand Down
Loading

0 comments on commit 44c00a9

Please sign in to comment.