Skip to content

Commit

Permalink
Merge branch 'master' into pingcap#18056
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored Sep 25, 2020
2 parents fb3c6c7 + 0673f33 commit f2a0b2b
Show file tree
Hide file tree
Showing 88 changed files with 101,838 additions and 1,150 deletions.
4 changes: 2 additions & 2 deletions cmd/explaintest/r/explain_easy.result
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ HashJoin_23 4166.67 root left outer join, equal:[eq(test.t1.c2, test.t2.c1)]
└─TableFullScan_34 10000.00 cop[tikv] table:t2 keep order:false, stats:pseudo
explain update t1 set t1.c2 = 2 where t1.c1 = 1;
id estRows task access object operator info
Update_2 N/A root N/A
Update_3 N/A root N/A
└─Point_Get_1 1.00 root table:t1 handle:1
explain delete from t1 where t1.c2 = 1;
id estRows task access object operator info
Expand Down Expand Up @@ -687,7 +687,7 @@ begin;
insert into t values (1, 1);
explain update t set j = -j where i = 1 and j = 1;
id estRows task access object operator info
Update_2 N/A root N/A
Update_3 N/A root N/A
└─Point_Get_1 1.00 root table:t, index:i(i, j)
rollback;
drop table if exists t;
Expand Down
2 changes: 1 addition & 1 deletion cmd/explaintest/r/explain_easy_stats.result
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ HashJoin_22 2481.25 root left outer join, equal:[eq(test.t1.c2, test.t2.c1)]
└─TableRangeScan_32 1998.00 cop[tikv] table:t1 range:(1,+inf], keep order:false
explain update t1 set t1.c2 = 2 where t1.c1 = 1;
id estRows task access object operator info
Update_2 N/A root N/A
Update_3 N/A root N/A
└─Point_Get_1 1.00 root table:t1 handle:1
explain delete from t1 where t1.c2 = 1;
id estRows task access object operator info
Expand Down
16 changes: 16 additions & 0 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2204,6 +2204,22 @@ func (s *testIntegrationSuite3) TestPartitionErrorCode(c *C) {
tk.MustGetErrCode("alter table t_part rebuild partition p0,p1;", tmysql.ErrUnsupportedDDLOperation)
tk.MustGetErrCode("alter table t_part remove partitioning;", tmysql.ErrUnsupportedDDLOperation)
tk.MustGetErrCode("alter table t_part repair partition p1;", tmysql.ErrUnsupportedDDLOperation)

// Reduce the impact on DML when executing partition DDL
tk1 := testkit.NewTestKit(c, s.store)
tk1.MustExec("use test")
tk1.MustExec("drop table if exists t;")
tk1.MustExec(`create table t(id int primary key)
partition by hash(id) partitions 4;`)
tk1.MustExec("begin")
tk1.MustExec("insert into t values(1);")

tk2 := testkit.NewTestKit(c, s.store)
tk2.MustExec("use test")
tk2.MustExec("alter table t truncate partition p0;")

_, err = tk1.Exec("commit")
c.Assert(err, IsNil)
}

func (s *testIntegrationSuite5) TestConstAndTimezoneDepent(c *C) {
Expand Down
6 changes: 4 additions & 2 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2235,7 +2235,8 @@ func (s *testDBSuite6) TestDropColumn(c *C) {
tk.MustExec("create table t1 (a int,b int) partition by hash(a) partitions 4;")
_, err := tk.Exec("alter table t1 drop column a")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[expression:1054]Unknown column 'a' in 'expression'")
// TODO: refine the error message to compatible with MySQL
c.Assert(err.Error(), Equals, "[planner:1054]Unknown column 'a' in 'expression'")

tk.MustExec("drop database drop_col_db")
}
Expand Down Expand Up @@ -5071,7 +5072,8 @@ func (s *testDBSuite2) TestDDLWithInvalidTableInfo(c *C) {
// Test drop partition column.
_, err = tk.Exec("alter table t drop column a;")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[expression:1054]Unknown column 'a' in 'expression'")
// TODO: refine the error message to compatible with MySQL
c.Assert(err.Error(), Equals, "[planner:1054]Unknown column 'a' in 'expression'")
// Test modify column with invalid expression.
_, err = tk.Exec("alter table t modify column c int GENERATED ALWAYS AS ((case when (a = 0) then 0when (a > 0) then (b / a) end));")
c.Assert(err, NotNil)
Expand Down
3 changes: 3 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,9 @@ func (d *ddl) startCleanDeadTableLock() {
if !d.ownerManager.IsOwner() {
continue
}
if d.infoHandle == nil || !d.infoHandle.IsValid() {
continue
}
deadLockTables, err := d.tableLockCkr.GetDeadLockedTables(d.ctx, d.infoHandle.Get().AllSchemas())
if err != nil {
logutil.BgLogger().Info("[ddl] get dead table lock failed.", zap.Error(err))
Expand Down
171 changes: 92 additions & 79 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -5321,16 +5321,15 @@ func (d *ddl) AlterIndexVisibility(ctx sessionctx.Context, ident ast.Ident, inde
return errors.Trace(err)
}

func buildPlacementSpecReplicasAndConstraint(rule *placement.RuleOp, replicas uint64, cnstr string) ([]*placement.RuleOp, error) {
func buildPlacementSpecReplicasAndConstraint(replicas uint64, cnstr string) ([]*placement.Rule, error) {
var err error
cnstr = strings.TrimSpace(cnstr)
rules := make([]*placement.RuleOp, 0, 1)
rules := make([]*placement.Rule, 0, 1)
if len(cnstr) > 0 && cnstr[0] == '[' {
// can not emit REPLICAS with an array label
if replicas == 0 {
return rules, errors.Errorf("array CONSTRAINTS should be with a positive REPLICAS")
}
rule.Count = int(replicas)

constraints := []string{}

Expand All @@ -5339,12 +5338,15 @@ func buildPlacementSpecReplicasAndConstraint(rule *placement.RuleOp, replicas ui
return rules, err
}

rule.LabelConstraints, err = placement.CheckLabelConstraints(constraints)
labelConstraints, err := placement.CheckLabelConstraints(constraints)
if err != nil {
return rules, err
}

rules = append(rules, rule)
rules = append(rules, &placement.Rule{
Count: int(replicas),
LabelConstraints: labelConstraints,
})
} else if len(cnstr) > 0 && cnstr[0] == '{' {
constraints := map[string]int{}
err = json.Unmarshal([]byte(cnstr), &constraints)
Expand All @@ -5354,7 +5356,6 @@ func buildPlacementSpecReplicasAndConstraint(rule *placement.RuleOp, replicas ui

ruleCnt := int(replicas)
for labels, cnt := range constraints {
newRule := rule.Clone()
if cnt <= 0 {
err = errors.Errorf("count should be positive, but got %d", cnt)
break
Expand All @@ -5367,108 +5368,91 @@ func buildPlacementSpecReplicasAndConstraint(rule *placement.RuleOp, replicas ui
break
}
}
newRule.Count = cnt

newRule.LabelConstraints, err = placement.CheckLabelConstraints(strings.Split(strings.TrimSpace(labels), ","))
labelConstraints, err := placement.CheckLabelConstraints(strings.Split(strings.TrimSpace(labels), ","))
if err != nil {
break
}
rules = append(rules, newRule)
rules = append(rules, &placement.Rule{
Count: cnt,
LabelConstraints: labelConstraints,
})
}
rule.Count = ruleCnt

if rule.Count > 0 {
rules = append(rules, rule)
if ruleCnt > 0 {
rules = append(rules, &placement.Rule{
Count: ruleCnt,
})
}
} else {
err = errors.Errorf("constraint should be a JSON array or object, but got '%s'", cnstr)
}
return rules, err
}

func buildPlacementSpecs(specs []*ast.PlacementSpec) ([]*placement.RuleOp, error) {
rules := make([]*placement.RuleOp, 0, len(specs))

func buildPlacementSpecs(bundle *placement.Bundle, specs []*ast.PlacementSpec) (*placement.Bundle, error) {
var err error
var sb strings.Builder
restoreFlags := format.RestoreStringSingleQuotes | format.RestoreKeyWordLowercase | format.RestoreNameBackQuotes
restoreCtx := format.NewRestoreCtx(restoreFlags, &sb)
var spec *ast.PlacementSpec

for _, spec := range specs {
rule := &placement.RuleOp{
Rule: &placement.Rule{
GroupID: placement.RuleDefaultGroupID,
Override: true,
},
}
for _, rspec := range specs {
spec = rspec

var role placement.PeerRoleType
switch spec.Role {
case ast.PlacementRoleFollower:
rule.Role = placement.Follower
role = placement.Follower
case ast.PlacementRoleLeader:
rule.Role = placement.Leader
role = placement.Leader
case ast.PlacementRoleLearner:
rule.Role = placement.Learner
role = placement.Learner
case ast.PlacementRoleVoter:
rule.Role = placement.Voter
role = placement.Voter
default:
err = errors.Errorf("unknown role: %d", spec.Role)
}
if err != nil {
break
}

if err == nil {
switch spec.Tp {
case ast.PlacementAdd:
rule.Action = placement.RuleOpAdd
case ast.PlacementAlter, ast.PlacementDrop:
rule.Action = placement.RuleOpAdd

// alter will overwrite all things
// drop all rules that will be overridden
newRules := rules[:0]

for _, r := range rules {
if r.Role != rule.Role {
newRules = append(newRules, r)
}
if spec.Tp == ast.PlacementAlter || spec.Tp == ast.PlacementDrop {
newRules := bundle.Rules[:0]
for _, r := range bundle.Rules {
if r.Role != role {
newRules = append(newRules, r)
}
}
bundle.Rules = newRules

rules = newRules

// delete previous definitions
rules = append(rules, &placement.RuleOp{
Action: placement.RuleOpDel,
DeleteByIDPrefix: true,
Rule: &placement.Rule{
GroupID: placement.RuleDefaultGroupID,
// ROLE is useless for PD, prevent two alter statements from coexisting
Role: rule.Role,
},
})

// alter == drop + add new rules
if spec.Tp == ast.PlacementDrop {
continue
}
default:
err = errors.Errorf("unknown action type: %d", spec.Tp)
// alter == drop + add new rules
if spec.Tp == ast.PlacementDrop {
continue
}
}

if err == nil {
var newRules []*placement.RuleOp
newRules, err = buildPlacementSpecReplicasAndConstraint(rule, spec.Replicas, spec.Constraints)
rules = append(rules, newRules...)
var newRules []*placement.Rule
newRules, err = buildPlacementSpecReplicasAndConstraint(spec.Replicas, spec.Constraints)
if err != nil {
break
}
for _, r := range newRules {
r.Role = role
bundle.Rules = append(bundle.Rules, r)
}
}

if err != nil {
sb.Reset()
if e := spec.Restore(restoreCtx); e != nil {
return rules, ErrInvalidPlacementSpec.GenWithStackByArgs("", err)
}
return rules, ErrInvalidPlacementSpec.GenWithStackByArgs(sb.String(), err)
if err != nil {
var sb strings.Builder
sb.Reset()

restoreCtx := format.NewRestoreCtx(format.RestoreStringSingleQuotes|format.RestoreKeyWordLowercase|format.RestoreNameBackQuotes, &sb)

if e := spec.Restore(restoreCtx); e != nil {
return nil, ErrInvalidPlacementSpec.GenWithStackByArgs("", err)
}

return nil, ErrInvalidPlacementSpec.GenWithStackByArgs(sb.String(), err)
}
return rules, nil

return bundle, nil
}

func (d *ddl) AlterTablePartition(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) (err error) {
Expand All @@ -5487,26 +5471,55 @@ func (d *ddl) AlterTablePartition(ctx sessionctx.Context, ident ast.Ident, spec
return errors.Trace(err)
}

rules, err := buildPlacementSpecs(spec.PlacementSpecs)
pid := placement.GroupID(partitionID)

oldBundle, ok := d.infoHandle.Get().BundleByName(pid)
if !ok {
oldBundle = &placement.Bundle{ID: pid}
} else {
oldBundle = oldBundle.Clone()
}

bundle, err := buildPlacementSpecs(oldBundle, spec.PlacementSpecs)
if err != nil {
return errors.Trace(err)
}

var extraCnt int
startKey := hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTablePrefix(partitionID)))
endKey := hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTablePrefix(partitionID+1)))
for _, rule := range rules {
rule.Index = placement.RuleIndexPartition
newRules := bundle.Rules[:0]
for i, rule := range bundle.Rules {
// merge all empty constraints
if len(rule.LabelConstraints) == 0 {
extraCnt += rule.Count
continue
}
rule.GroupID = bundle.ID
rule.ID = strconv.Itoa(i)
rule.StartKeyHex = startKey
rule.EndKeyHex = endKey
newRules = append(newRules, rule)
}
if extraCnt > 0 {
bundle.Rules = append(newRules, &placement.Rule{
GroupID: bundle.ID,
ID: "default",
Count: extraCnt,
StartKeyHex: startKey,
EndKeyHex: endKey,
})
}
bundle.Index = placement.RuleIndexPartition
bundle.Override = true

job := &model.Job{
SchemaID: schema.ID,
TableID: meta.ID,
SchemaName: schema.Name.L,
Type: model.ActionAlterTableAlterPartition,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{partitionID, rules},
Args: []interface{}{partitionID, bundle},
}

err = d.doDDLJob(ctx, job)
Expand Down
16 changes: 16 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,22 @@ func updateSchemaVersion(t *meta.Meta, job *model.Job) (int64, error) {
OldTableID: ptTableID,
}
diff.AffectedOpts = affects
case model.ActionTruncateTablePartition:
var oldIDs []int64
err = job.DecodeArgs(&oldIDs)
if err != nil {
return 0, errors.Trace(err)
}
diff.TableID = job.TableID
affects := make([]*model.AffectedOption, len(oldIDs))
for i := 0; i < len(oldIDs); i++ {
affects[i] = &model.AffectedOption{
SchemaID: job.SchemaID,
TableID: oldIDs[i],
OldTableID: oldIDs[i],
}
}
diff.AffectedOpts = affects
default:
diff.TableID = job.TableID
}
Expand Down
Loading

0 comments on commit f2a0b2b

Please sign in to comment.