Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: Fixed partitioning a non-partitioned table with placement rules #57560

Merged
merged 24 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
258638a
Fixed partitioning a non-partitioned table with placement rules
mjonss Nov 20, 2024
93a14c9
Linting
mjonss Nov 20, 2024
9210c20
Removed handling of partition bundles, since it was only using table …
mjonss Nov 20, 2024
f156b89
Fixed update placement bundles for partitions
mjonss Nov 21, 2024
58d859d
In REORG PARTITION we first register the intermediate state and then …
mjonss Nov 21, 2024
227ccd8
Fixed failure and added tests for rollback of reorg partition and pla…
mjonss Nov 21, 2024
9200fbb
Added more tests and one minor fix
mjonss Nov 22, 2024
d58c8d2
Linting
mjonss Nov 22, 2024
54abd6d
More advanced comparison for bundle rules leader overlap
mjonss Nov 22, 2024
2f97ac7
Renamed a function and added more comments
mjonss Nov 22, 2024
7fcdaf3
Updated check for bundle, to match PD
mjonss Nov 25, 2024
cb2bd27
Added enhancement issue reference
mjonss Nov 25, 2024
f480038
Linting
mjonss Nov 25, 2024
2a4642a
Linting
mjonss Nov 25, 2024
274760f
Merge remote-tracking branch 'pingcap/master' into partition-by-place…
mjonss Nov 26, 2024
5d78361
bazel_prepare
mjonss Nov 26, 2024
acee32e
Simplified CheckBundle in mockPlacementManager
mjonss Nov 26, 2024
ee5cf49
Linting
mjonss Nov 26, 2024
05a6a14
Eased the CheckBundle to pass more test.
mjonss Nov 26, 2024
b25fe4a
Simplified CheckBundle
mjonss Nov 28, 2024
1c43fbe
Removed non-needed struct variables for check
mjonss Nov 28, 2024
17dfd76
Removed old comment
mjonss Nov 28, 2024
fd74392
Added more comments and simplified CheckBundle further
mjonss Nov 28, 2024
c6e1ad0
Removed confusing test.
mjonss Nov 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ go_test(
"@com_github_tikv_client_go_v2//testutils",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//util",
"@com_github_tikv_pd_client//http",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_tests_v3//integration",
"@org_golang_google_grpc//:grpc",
Expand Down
116 changes: 63 additions & 53 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,17 +159,12 @@ func (w *worker) onAddTablePartition(jobCtx *jobContext, job *model.Job) (ver in
}
}

bundles, err := alterTablePartitionBundles(jobCtx.metaMut, tblInfo, tblInfo.Partition.AddingDefinitions)
_, err = alterTablePartitionBundles(jobCtx.metaMut, tblInfo, tblInfo.Partition.AddingDefinitions)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

if err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}

ids := getIDs([]*model.TableInfo{tblInfo})
for _, p := range tblInfo.Partition.AddingDefinitions {
ids = append(ids, p.ID)
Expand Down Expand Up @@ -270,33 +265,52 @@ func alterTableLabelRule(schemaName string, meta *model.TableInfo, ids []int64)
return false, nil
}

func alterTablePartitionBundles(t *meta.Mutator, tblInfo *model.TableInfo, addingDefinitions []model.PartitionDefinition) ([]*placement.Bundle, error) {
var bundles []*placement.Bundle
func alterTablePartitionBundles(t *meta.Mutator, tblInfo *model.TableInfo, addDefs []model.PartitionDefinition) (bool, error) {
// We want to achieve:
// - before we do any reorganization/write to new partitions/global indexes that the placement rules are in-place
// - not removing any placement rules for removed partitions
// So we will:
// 1) First write the new bundles including both new and old partitions,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the new bundles should include old partitions? The old bundles have already defined these rules and still take effects here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, so if the Bundle is overwritten, the older bundle rules will still be in effect?
Like if the first bundle had rules for table id 111, part id 112 and 113, and the new bundle would have table id 111, partition id 113 and 114, would part id 112 still be placed as the old bundle rules or would it be placed without any rules?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or would it be placed without any rules?

This, I believe.

// it will filter the rules depends on the interval index override in the same group or the
// group-index override between different groups
// For example, given rules:
// ruleA: group_id: 4, id: 2, override: true
// ruleB: group_id: 4, id: 1, override: true
// ruleC: group_id: 3
// ruleD: group_id: 2
// RuleGroupA: id:4, override: false
// RuleGroupB: id:3, override: true
// RuleGroupC: id:2, override: false
// Finally only ruleA and ruleC will be selected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A better example:

CREATE TABLE t (a int) partition by range (a) (partition p0 values less than (1000000), partition p1M values less than (2000000)) placement policy pp1;
ALTER TABLE t REORGANIZE PARTITION p1M INTO (PARTITION p1M values less than (2000000), partition p2M values less than (3000000));

During the DDL we would still want the original p1M to follow the table level rule, so we keep it in the bundle.
When the final bundles will be created it will not be included.
Since the table keeps its table id (for REORGANIZE PARTITION) there are no other bundles that covers the inherited table's placement rules for the original partition p1M, which means we need to cover it during the time of the DDL, since it can still be accessed (and double written).

Another alternative would be to create partition level bundles for the old replaced partitions, and let them be removed by GC later, but I think that would qualify for a separate PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the rule bundle check improvements would be better addressed in a separate PR, I will update the check and add a unit test here, but when looking at the code in PD, it looks like the example in the comment for prepareRulesForApply() is wrong, if I'm reading the code correctly, I would expect it to return 'ruleC and ruleD`, since ruleB would override ruleA, and ruleC would override all previous rules, due to its group has override set, and finally ruleD would just be added.

And it does not check any key ranges, neither in this function or in checkApplyRules(), which only checks that there are at least one leader or voter, and max one leader.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The example would be correct if the rules would be ordered in reverse, i.e. according to their GroupID, ID.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed the logic to only check bundles one by one, and tried to mimic the logic of PDs prepareRulesForApply() and checkApplyRules(), for any other check I created #57693 as a follow up.

// EXCEPT if the old partition is in fact a table, then skip that partition
// 2) Then overwrite the bundles with the final partitioning scheme (second call in onReorg/

// tblInfo do not include added partitions, so we should add them first
tblInfo = tblInfo.Clone()
p := *tblInfo.Partition
p.Definitions = append([]model.PartitionDefinition{}, p.Definitions...)
p.Definitions = append(tblInfo.Partition.Definitions, addingDefinitions...)
tblInfo.Partition = &p
p := tblInfo.Partition
if p != nil {
// if partitioning a non-partitioned table, we will first change the metadata,
// so the table looks like a partitioned table, with the first/only partition having
// the same partition ID as the table, so we can access the table as a single partition.
// But in this case we should not add a bundle rule for the same range
// both as table and partition.
if p.Definitions[0].ID != tblInfo.ID {
mjonss marked this conversation as resolved.
Show resolved Hide resolved
// prepend with existing partitions
addDefs = append(p.Definitions, addDefs...)
}
p.Definitions = addDefs
}

// bundle for table should be recomputed because it includes some default configs for partitions
tblBundle, err := placement.NewTableBundle(t, tblInfo)
if err != nil {
return nil, errors.Trace(err)
return false, errors.Trace(err)
}

var bundles []*placement.Bundle
if tblBundle != nil {
bundles = append(bundles, tblBundle)
}

partitionBundles, err := placement.NewPartitionListBundles(t, addingDefinitions)
partitionBundles, err := placement.NewPartitionListBundles(t, addDefs)
if err != nil {
return nil, errors.Trace(err)
return false, errors.Trace(err)
}

bundles = append(bundles, partitionBundles...)
return bundles, nil

if len(bundles) > 0 {
return true, infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles)
}
return false, nil
}

// When drop/truncate a partition, we should still keep the dropped partition's placement settings to avoid unnecessary region schedules.
Expand Down Expand Up @@ -336,20 +350,16 @@ func updateAddingPartitionInfo(partitionInfo *model.PartitionInfo, tblInfo *mode
tblInfo.Partition.AddingDefinitions = append(tblInfo.Partition.AddingDefinitions, newDefs...)
}

// rollbackAddingPartitionInfo remove the `addingDefinitions` in the tableInfo.
func rollbackAddingPartitionInfo(tblInfo *model.TableInfo) ([]int64, []string, []*placement.Bundle) {
// removePartitionAddingDefinitionsFromTableInfo remove the `addingDefinitions` in the tableInfo.
func removePartitionAddingDefinitionsFromTableInfo(tblInfo *model.TableInfo) ([]int64, []string) {
physicalTableIDs := make([]int64, 0, len(tblInfo.Partition.AddingDefinitions))
partNames := make([]string, 0, len(tblInfo.Partition.AddingDefinitions))
rollbackBundles := make([]*placement.Bundle, 0, len(tblInfo.Partition.AddingDefinitions))
for _, one := range tblInfo.Partition.AddingDefinitions {
physicalTableIDs = append(physicalTableIDs, one.ID)
partNames = append(partNames, one.Name.L)
if one.PlacementPolicyRef != nil {
rollbackBundles = append(rollbackBundles, placement.NewBundle(one.ID))
}
}
tblInfo.Partition.AddingDefinitions = nil
return physicalTableIDs, partNames, rollbackBundles
return physicalTableIDs, partNames
}

// checkAddPartitionValue check add Partition Values,
Expand Down Expand Up @@ -2116,20 +2126,21 @@ func dropLabelRules(ctx context.Context, schemaName, tableName string, partNames
// It will drop newly created partitions that has not yet been used, including cleaning
// up label rules and bundles as well as changed indexes due to global flag.
func (w *worker) rollbackLikeDropPartition(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
args := jobCtx.jobArgs.(*model.TablePartitionArgs)
args, err := model.GetTablePartitionArgs(job)
if err != nil {
return ver, errors.Trace(err)
}
partInfo := args.PartInfo
metaMut := jobCtx.metaMut
tblInfo, err := GetTableInfoAndCancelFaultJob(metaMut, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}
tblInfo.Partition.DroppingDefinitions = nil
physicalTableIDs, pNames, rollbackBundles := rollbackAddingPartitionInfo(tblInfo)
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), rollbackBundles)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}
// Collect table/partition ids to clean up, through args.OldPhysicalTblIDs
// GC will later also drop matching Placement bundles.
// If we delete them now, it could lead to non-compliant placement or failure during flashback
physicalTableIDs, pNames := removePartitionAddingDefinitionsFromTableInfo(tblInfo)
// TODO: Will this drop LabelRules for existing partitions, if the new partitions have the same name?
err = dropLabelRules(w.ctx, job.SchemaName, tblInfo.Name.L, pNames)
if err != nil {
Expand All @@ -2144,7 +2155,9 @@ func (w *worker) rollbackLikeDropPartition(jobCtx *jobContext, job *model.Job) (
if partInfo.Type != pmodel.PartitionTypeNone {
// ALTER TABLE ... PARTITION BY
// Also remove anything with the new table id
physicalTableIDs = append(physicalTableIDs, partInfo.NewTableID)
if partInfo.NewTableID != 0 {
physicalTableIDs = append(physicalTableIDs, partInfo.NewTableID)
}
// Reset if it was normal table before
if tblInfo.Partition.Type == pmodel.PartitionTypeNone ||
tblInfo.Partition.DDLType == pmodel.PartitionTypeNone {
Expand All @@ -2167,6 +2180,11 @@ func (w *worker) rollbackLikeDropPartition(jobCtx *jobContext, job *model.Job) (
tblInfo.Partition.ClearReorgIntermediateInfo()
}

_, err = alterTablePartitionBundles(metaMut, tblInfo, nil)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
Expand Down Expand Up @@ -3127,11 +3145,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
jobCtx.jobArgs = args
// Handle the rolling back job
if job.IsRollingback() {
ver, err := w.rollbackLikeDropPartition(jobCtx, job)
if err != nil {
return ver, errors.Trace(err)
}
return ver, nil
return w.rollbackLikeDropPartition(jobCtx, job)
}

tblInfo, partNames, partInfo, err := getReorgPartitionInfo(jobCtx.metaMut, job, args)
Expand All @@ -3149,6 +3163,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
// The partInfo may have been checked against an older schema version for example.
// If the check is done here, it does not need to be repeated, since no other
// DDL on the same table can be run concurrently.
tblInfo.Partition.DDLAction = job.Type
num := len(partInfo.Definitions) - len(partNames) + len(tblInfo.Partition.Definitions)
err = checkAddPartitionTooManyPartitions(uint64(num))
if err != nil {
Expand Down Expand Up @@ -3276,31 +3291,21 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
// In the next step, StateDeleteOnly, wait to verify the TiFlash replicas are OK
}

bundles, err := alterTablePartitionBundles(metaMut, tblInfo, tblInfo.Partition.AddingDefinitions)
changed, err := alterTablePartitionBundles(metaMut, tblInfo, tblInfo.Partition.AddingDefinitions)
if err != nil {
if !changesMade {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
return rollbackReorganizePartitionWithErr(jobCtx, job, err)
}

if len(bundles) > 0 {
if err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles); err != nil {
if !changesMade {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}
return rollbackReorganizePartitionWithErr(jobCtx, job, err)
}
changesMade = true
}
changesMade = changesMade || changed
lcwangchao marked this conversation as resolved.
Show resolved Hide resolved

ids := getIDs([]*model.TableInfo{tblInfo})
for _, p := range tblInfo.Partition.AddingDefinitions {
ids = append(ids, p.ID)
}
changed, err := alterTableLabelRule(job.SchemaName, tblInfo, ids)
changed, err = alterTableLabelRule(job.SchemaName, tblInfo, ids)
changesMade = changesMade || changed
if err != nil {
if !changesMade {
Expand All @@ -3326,7 +3331,6 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String()).Set(0.1 / float64(math.MaxUint64))
job.SchemaState = model.StateDeleteOnly
tblInfo.Partition.DDLState = job.SchemaState
tblInfo.Partition.DDLAction = job.Type
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
Expand Down Expand Up @@ -3574,7 +3578,6 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
// REMOVE PARTITIONING
// Storing the old table ID, used for updating statistics.
oldTblID = tblInfo.ID
// TODO: Handle bundles?
// TODO: Add concurrent test!
// TODO: Will this result in big gaps?
// TODO: How to carrie over AUTO_INCREMENT etc.?
Expand All @@ -3588,7 +3591,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
return ver, errors.Trace(err)
}
tblInfo.ID = partInfo.NewTableID
if partInfo.DDLType != pmodel.PartitionTypeNone {
if oldTblID != physicalTableIDs[0] {
// if partitioned before, then also add the old table ID,
// otherwise it will be the already included first partition
physicalTableIDs = append(physicalTableIDs, oldTblID)
Expand All @@ -3608,6 +3611,13 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
return ver, errors.Trace(err)
}
}

// We need to update the Placement rule bundles with the final partitions.
_, err = alterTablePartitionBundles(metaMut, tblInfo, nil)
if err != nil {
return ver, err
}

failpoint.Inject("reorgPartFail5", func(val failpoint.Value) {
if val.(bool) {
job.ErrorCount += variable.GetDDLErrorCountLimit() / 2
Expand Down
Loading