Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: Fixed partitioning a non-partitioned table with placement rules (#57560) #57811

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
85f6afc
Fixed partitioning a non-partitioned table with placement rules
mjonss Nov 20, 2024
0f12181
Linting
mjonss Nov 20, 2024
c02cc49
Removed handling of partition bundles, since it was only using table …
mjonss Nov 20, 2024
43601dd
Fixed update placement bundles for partitions
mjonss Nov 21, 2024
f7dedda
In REORG PARTITION we first register the intermediate state and then …
mjonss Nov 21, 2024
d20c285
Fixed failure and added tests for rollback of reorg partition and pla…
mjonss Nov 21, 2024
ac8640d
Added more tests and one minor fix
mjonss Nov 22, 2024
8b03577
Linting
mjonss Nov 22, 2024
f2eecf2
More advanced comparison for bundle rules leader overlap
mjonss Nov 22, 2024
ac4f20a
Renamed a function and added more comments
mjonss Nov 22, 2024
82c4875
Updated check for bundle, to match PD
mjonss Nov 25, 2024
9d4bc5e
Added enhancement issue reference
mjonss Nov 25, 2024
81783db
Linting
mjonss Nov 25, 2024
bae1ec5
Linting
mjonss Nov 25, 2024
1274c63
bazel_prepare
mjonss Nov 26, 2024
e8141c6
Simplified CheckBundle in mockPlacementManager
mjonss Nov 26, 2024
c1f9619
Linting
mjonss Nov 26, 2024
bf03e34
Eased the CheckBundle to pass more test.
mjonss Nov 26, 2024
f655589
Simplified CheckBundle
mjonss Nov 28, 2024
266bc08
Removed non-needed struct variables for check
mjonss Nov 28, 2024
978125e
Removed old comment
mjonss Nov 28, 2024
c7df0ff
Added more comments and simplified CheckBundle further
mjonss Nov 28, 2024
bb27194
Removed confusing test.
mjonss Nov 28, 2024
a95a21f
bazel_prepare
mjonss Nov 29, 2024
9d3b922
Merge remote-tracking branch 'pingcap/release-8.5' into cherry-pick-5…
mjonss Nov 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ go_test(
"@com_github_tikv_client_go_v2//testutils",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//util",
"@com_github_tikv_pd_client//http",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_tests_v3//integration",
"@org_golang_google_grpc//:grpc",
Expand Down
116 changes: 63 additions & 53 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,17 +158,12 @@ func (w *worker) onAddTablePartition(jobCtx *jobContext, job *model.Job) (ver in
}
}

bundles, err := alterTablePartitionBundles(jobCtx.metaMut, tblInfo, tblInfo.Partition.AddingDefinitions)
_, err = alterTablePartitionBundles(jobCtx.metaMut, tblInfo, tblInfo.Partition.AddingDefinitions)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

if err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}

ids := getIDs([]*model.TableInfo{tblInfo})
for _, p := range tblInfo.Partition.AddingDefinitions {
ids = append(ids, p.ID)
Expand Down Expand Up @@ -269,33 +264,52 @@ func alterTableLabelRule(schemaName string, meta *model.TableInfo, ids []int64)
return false, nil
}

func alterTablePartitionBundles(t *meta.Mutator, tblInfo *model.TableInfo, addingDefinitions []model.PartitionDefinition) ([]*placement.Bundle, error) {
var bundles []*placement.Bundle
func alterTablePartitionBundles(t *meta.Mutator, tblInfo *model.TableInfo, addDefs []model.PartitionDefinition) (bool, error) {
// We want to achieve:
// - before we do any reorganization/write to new partitions/global indexes that the placement rules are in-place
// - not removing any placement rules for removed partitions
// So we will:
// 1) First write the new bundles including both new and old partitions,
// EXCEPT if the old partition is in fact a table, then skip that partition
// 2) Then overwrite the bundles with the final partitioning scheme (second call in onReorg/

// tblInfo do not include added partitions, so we should add them first
tblInfo = tblInfo.Clone()
p := *tblInfo.Partition
p.Definitions = append([]model.PartitionDefinition{}, p.Definitions...)
p.Definitions = append(tblInfo.Partition.Definitions, addingDefinitions...)
tblInfo.Partition = &p
p := tblInfo.Partition
if p != nil {
// if partitioning a non-partitioned table, we will first change the metadata,
// so the table looks like a partitioned table, with the first/only partition having
// the same partition ID as the table, so we can access the table as a single partition.
// But in this case we should not add a bundle rule for the same range
// both as table and partition.
if p.Definitions[0].ID != tblInfo.ID {
// prepend with existing partitions
addDefs = append(p.Definitions, addDefs...)
}
p.Definitions = addDefs
}

// bundle for table should be recomputed because it includes some default configs for partitions
tblBundle, err := placement.NewTableBundle(t, tblInfo)
if err != nil {
return nil, errors.Trace(err)
return false, errors.Trace(err)
}

var bundles []*placement.Bundle
if tblBundle != nil {
bundles = append(bundles, tblBundle)
}

partitionBundles, err := placement.NewPartitionListBundles(t, addingDefinitions)
partitionBundles, err := placement.NewPartitionListBundles(t, addDefs)
if err != nil {
return nil, errors.Trace(err)
return false, errors.Trace(err)
}

bundles = append(bundles, partitionBundles...)
return bundles, nil

if len(bundles) > 0 {
return true, infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles)
}
return false, nil
}

// When drop/truncate a partition, we should still keep the dropped partition's placement settings to avoid unnecessary region schedules.
Expand Down Expand Up @@ -335,20 +349,16 @@ func updateAddingPartitionInfo(partitionInfo *model.PartitionInfo, tblInfo *mode
tblInfo.Partition.AddingDefinitions = append(tblInfo.Partition.AddingDefinitions, newDefs...)
}

// rollbackAddingPartitionInfo remove the `addingDefinitions` in the tableInfo.
func rollbackAddingPartitionInfo(tblInfo *model.TableInfo) ([]int64, []string, []*placement.Bundle) {
// removePartitionAddingDefinitionsFromTableInfo remove the `addingDefinitions` in the tableInfo.
func removePartitionAddingDefinitionsFromTableInfo(tblInfo *model.TableInfo) ([]int64, []string) {
physicalTableIDs := make([]int64, 0, len(tblInfo.Partition.AddingDefinitions))
partNames := make([]string, 0, len(tblInfo.Partition.AddingDefinitions))
rollbackBundles := make([]*placement.Bundle, 0, len(tblInfo.Partition.AddingDefinitions))
for _, one := range tblInfo.Partition.AddingDefinitions {
physicalTableIDs = append(physicalTableIDs, one.ID)
partNames = append(partNames, one.Name.L)
if one.PlacementPolicyRef != nil {
rollbackBundles = append(rollbackBundles, placement.NewBundle(one.ID))
}
}
tblInfo.Partition.AddingDefinitions = nil
return physicalTableIDs, partNames, rollbackBundles
return physicalTableIDs, partNames
}

// checkAddPartitionValue check add Partition Values,
Expand Down Expand Up @@ -2115,20 +2125,21 @@ func dropLabelRules(ctx context.Context, schemaName, tableName string, partNames
// It will drop newly created partitions that has not yet been used, including cleaning
// up label rules and bundles as well as changed indexes due to global flag.
func (w *worker) rollbackLikeDropPartition(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
args := jobCtx.jobArgs.(*model.TablePartitionArgs)
args, err := model.GetTablePartitionArgs(job)
if err != nil {
return ver, errors.Trace(err)
}
partInfo := args.PartInfo
metaMut := jobCtx.metaMut
tblInfo, err := GetTableInfoAndCancelFaultJob(metaMut, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}
tblInfo.Partition.DroppingDefinitions = nil
physicalTableIDs, pNames, rollbackBundles := rollbackAddingPartitionInfo(tblInfo)
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), rollbackBundles)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}
// Collect table/partition ids to clean up, through args.OldPhysicalTblIDs
// GC will later also drop matching Placement bundles.
// If we delete them now, it could lead to non-compliant placement or failure during flashback
physicalTableIDs, pNames := removePartitionAddingDefinitionsFromTableInfo(tblInfo)
// TODO: Will this drop LabelRules for existing partitions, if the new partitions have the same name?
err = dropLabelRules(w.ctx, job.SchemaName, tblInfo.Name.L, pNames)
if err != nil {
Expand All @@ -2143,7 +2154,9 @@ func (w *worker) rollbackLikeDropPartition(jobCtx *jobContext, job *model.Job) (
if partInfo.Type != pmodel.PartitionTypeNone {
// ALTER TABLE ... PARTITION BY
// Also remove anything with the new table id
physicalTableIDs = append(physicalTableIDs, partInfo.NewTableID)
if partInfo.NewTableID != 0 {
physicalTableIDs = append(physicalTableIDs, partInfo.NewTableID)
}
// Reset if it was normal table before
if tblInfo.Partition.Type == pmodel.PartitionTypeNone ||
tblInfo.Partition.DDLType == pmodel.PartitionTypeNone {
Expand Down Expand Up @@ -2172,6 +2185,11 @@ func (w *worker) rollbackLikeDropPartition(jobCtx *jobContext, job *model.Job) (
tblInfo.Partition.ClearReorgIntermediateInfo()
}

_, err = alterTablePartitionBundles(metaMut, tblInfo, nil)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
Expand Down Expand Up @@ -3133,11 +3151,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
jobCtx.jobArgs = args
// Handle the rolling back job
if job.IsRollingback() {
ver, err := w.rollbackLikeDropPartition(jobCtx, job)
if err != nil {
return ver, errors.Trace(err)
}
return ver, nil
return w.rollbackLikeDropPartition(jobCtx, job)
}

tblInfo, partNames, partInfo, err := getReorgPartitionInfo(jobCtx.metaMut, job, args)
Expand All @@ -3155,6 +3169,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
// The partInfo may have been checked against an older schema version for example.
// If the check is done here, it does not need to be repeated, since no other
// DDL on the same table can be run concurrently.
tblInfo.Partition.DDLAction = job.Type
num := len(partInfo.Definitions) - len(partNames) + len(tblInfo.Partition.Definitions)
err = checkAddPartitionTooManyPartitions(uint64(num))
if err != nil {
Expand Down Expand Up @@ -3282,31 +3297,21 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
// In the next step, StateDeleteOnly, wait to verify the TiFlash replicas are OK
}

bundles, err := alterTablePartitionBundles(metaMut, tblInfo, tblInfo.Partition.AddingDefinitions)
changed, err := alterTablePartitionBundles(metaMut, tblInfo, tblInfo.Partition.AddingDefinitions)
if err != nil {
if !changesMade {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
return rollbackReorganizePartitionWithErr(jobCtx, job, err)
}

if len(bundles) > 0 {
if err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles); err != nil {
if !changesMade {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}
return rollbackReorganizePartitionWithErr(jobCtx, job, err)
}
changesMade = true
}
changesMade = changesMade || changed

ids := getIDs([]*model.TableInfo{tblInfo})
for _, p := range tblInfo.Partition.AddingDefinitions {
ids = append(ids, p.ID)
}
changed, err := alterTableLabelRule(job.SchemaName, tblInfo, ids)
changed, err = alterTableLabelRule(job.SchemaName, tblInfo, ids)
changesMade = changesMade || changed
if err != nil {
if !changesMade {
Expand All @@ -3332,7 +3337,6 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String()).Set(0.1 / float64(math.MaxUint64))
job.SchemaState = model.StateDeleteOnly
tblInfo.Partition.DDLState = job.SchemaState
tblInfo.Partition.DDLAction = job.Type
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
Expand Down Expand Up @@ -3581,7 +3585,6 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
// REMOVE PARTITIONING
// Storing the old table ID, used for updating statistics.
oldTblID = tblInfo.ID
// TODO: Handle bundles?
// TODO: Add concurrent test!
// TODO: Will this result in big gaps?
// TODO: How to carrie over AUTO_INCREMENT etc.?
Expand All @@ -3595,7 +3598,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
return ver, errors.Trace(err)
}
tblInfo.ID = partInfo.NewTableID
if partInfo.DDLType != pmodel.PartitionTypeNone {
if oldTblID != physicalTableIDs[0] {
// if partitioned before, then also add the old table ID,
// otherwise it will be the already included first partition
physicalTableIDs = append(physicalTableIDs, oldTblID)
Expand All @@ -3615,6 +3618,13 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
return ver, errors.Trace(err)
}
}

// We need to update the Placement rule bundles with the final partitions.
_, err = alterTablePartitionBundles(metaMut, tblInfo, nil)
if err != nil {
return ver, err
}

failpoint.Inject("reorgPartFail5", func(val failpoint.Value) {
if val.(bool) {
job.ErrorCount += variable.GetDDLErrorCountLimit() / 2
Expand Down
Loading