Skip to content

Commit

Permalink
Merge pull request #3 from crazycs520/support-delete
Browse files Browse the repository at this point in the history
support auto delete partition
  • Loading branch information
crazycs520 authored Jan 6, 2022
2 parents 5ab74c9 + 295aefa commit ed3dee3
Show file tree
Hide file tree
Showing 24 changed files with 9,252 additions and 8,732 deletions.
7 changes: 7 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,12 @@ type Config struct {
MaxBallastObjectSize int `toml:"max-ballast-object-size" json:"max-ballast-object-size"`
// BallastObjectSize set the initial size of the ballast object, the unit is byte.
BallastObjectSize int `toml:"ballast-object-size" json:"ballast-object-size"`

Aws AWSConfig `toml:"aws" json:"aws"`
}

type AWSConfig struct {
Region string `toml:"region" json:"region"`
}

// UpdateTempStoragePath is to update the `TempStoragePath` if port/statusPort was changed
Expand Down Expand Up @@ -776,6 +782,7 @@ var defaultConf = Config{
EnableEnumLengthLimit: true,
StoresRefreshInterval: defTiKVCfg.StoresRefreshInterval,
EnableForwarding: defTiKVCfg.EnableForwarding,
Aws: AWSConfig{Region: "us-west-2"},
}

var (
Expand Down
2 changes: 1 addition & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ type DDL interface {
CreatePlacementPolicy(ctx sessionctx.Context, stmt *ast.CreatePlacementPolicyStmt) error
DropPlacementPolicy(ctx sessionctx.Context, stmt *ast.DropPlacementPolicyStmt) error
AlterPlacementPolicy(ctx sessionctx.Context, stmt *ast.AlterPlacementPolicyStmt) error
AlterTablePartitionsMoveEngine(ctx sessionctx.Context, stmt *ast.AlterTableMoveStmt) error
AlterTablePartitionsAutoAction(ctx sessionctx.Context, stmt *ast.AlterTablePartitionsAutoActionStmt) error
AlterTablePartitionMeta(ctx sessionctx.Context, dbInfo *model.DBInfo, tbInfo *model.TableInfo, info *AlterTablePartitionInfo) (err error)

// CreateSchemaWithInfo creates a database (schema) given its database info.
Expand Down
38 changes: 25 additions & 13 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6714,24 +6714,29 @@ func (d *ddl) AlterPlacementPolicy(ctx sessionctx.Context, stmt *ast.AlterPlacem

var supportedEngineList = []string{"aws_s3"}

func (d *ddl) AlterTablePartitionsMoveEngine(ctx sessionctx.Context, stmt *ast.AlterTableMoveStmt) (err error) {
func (d *ddl) AlterTablePartitionsAutoAction(ctx sessionctx.Context, stmt *ast.AlterTablePartitionsAutoActionStmt) (err error) {
tbInfo := stmt.Table.TableInfo
// check range partition
pi := tbInfo.GetPartitionInfo()
if pi == nil || pi.Type != model.PartitionTypeRange || len(pi.Columns) > 0 {
return errors.New("operation only supported in range partition table")
}
// check engine
found := false
engineName := strings.ToLower(stmt.EngineName)
for _, e := range supportedEngineList {
if engineName == e {
found = true
break

autoInfo := &AutoActionInfo{Action: stmt.Action}
if stmt.Action == ast.AutoActionMove {
// check engine
found := false
engineName := strings.ToLower(stmt.EngineName)
for _, e := range supportedEngineList {
if engineName == e {
found = true
break
}
}
}
if !found {
return errors.Errorf("unknown engine %v", stmt.EngineName)
if !found {
return errors.Errorf("unknown engine %v", stmt.EngineName)
}
autoInfo.Engine = engineName
}
// check expression
err = checkPartitionValuesIsInt(ctx, &ast.PartitionDefinition{}, []ast.ExprNode{stmt.LessThanExpr}, tbInfo)
Expand All @@ -6745,20 +6750,27 @@ func (d *ddl) AlterTablePartitionsMoveEngine(ctx sessionctx.Context, stmt *ast.A
if err != nil {
return err
}
autoInfo.LessThanExpr = buf.String()

job := &model.Job{
SchemaID: stmt.Table.DBInfo.ID,
SchemaName: stmt.Table.DBInfo.Name.L,
TableID: tbInfo.ID,
Type: model.ActionAlterTablePartitionsMove,
Type: model.ActionAlterTablePartitionsAutoAction,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{buf.String(), engineName},
Args: []interface{}{autoInfo},
}
err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
}

type AutoActionInfo struct {
Action ast.AutoActionType `json:"type"`
LessThanExpr string `json:"expr"`
Engine string `json:"engine"`
}

type AlterTablePartitionInfo struct {
PID int64 `json:"pid"`
ReadOnly bool `json:"readonly"`
Expand Down
4 changes: 2 additions & 2 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,8 +861,8 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
ver, err = onAlterCacheTable(t, job)
case model.ActionAlterNoCacheTable:
ver, err = onAlterNoCacheTable(t, job)
case model.ActionAlterTablePartitionsMove:
ver, err = onAlterTablePartitionsMove(t, job)
case model.ActionAlterTablePartitionsAutoAction:
ver, err = onAlterTablePartitionsAutoAction(t, job)
case model.ActionAlterTablePartitionMeta:
ver, err = onAlterTablePartitionMeta(t, job)
default:
Expand Down
16 changes: 11 additions & 5 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1573,9 +1573,9 @@ func onAlterNoCacheTable(t *meta.Meta, job *model.Job) (ver int64, err error) {
return ver, err
}

func onAlterTablePartitionsMove(t *meta.Meta, job *model.Job) (ver int64, err error) {
var exprStr, engineName string
if err := job.DecodeArgs(&exprStr, &engineName); err != nil {
func onAlterTablePartitionsAutoAction(t *meta.Meta, job *model.Job) (ver int64, err error) {
var autoActionInfo AutoActionInfo
if err := job.DecodeArgs(&autoActionInfo); err != nil {
// Invalid arguments, cancel this job.
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand All @@ -1592,8 +1592,14 @@ func onAlterTablePartitionsMove(t *meta.Meta, job *model.Job) (ver int64, err er
job.State = model.JobStateCancelled
return ver, ErrInvalidDDLState.GenWithStack("operation only supported in range partition table")
}
pi.Interval.MovePartitionExpr = exprStr
pi.Interval.Engine = engineName

switch autoActionInfo.Action {
case ast.AutoActionMove:
pi.AutoAction.MovePartitionExpr = autoActionInfo.LessThanExpr
pi.AutoAction.MoveToEngine = autoActionInfo.Engine
case ast.AutoActionDelete:
pi.AutoAction.DeletePartitionExpr = autoActionInfo.LessThanExpr
}

ver, err = updateVersionAndTableInfo(t, job, tbInfo, true)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (e *DDLExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
err = e.executeDropPlacementPolicy(x)
case *ast.AlterPlacementPolicyStmt:
err = e.executeAlterPlacementPolicy(x)
case *ast.AlterTableMoveStmt:
case *ast.AlterTablePartitionsAutoActionStmt:
err = e.executeAlterTablePartitionsMoveEngine(x)
}
if err != nil {
Expand Down Expand Up @@ -876,6 +876,6 @@ func (e *DDLExec) executeAlterPlacementPolicy(s *ast.AlterPlacementPolicyStmt) e
return domain.GetDomain(e.ctx).DDL().AlterPlacementPolicy(e.ctx, s)
}

func (e *DDLExec) executeAlterTablePartitionsMoveEngine(s *ast.AlterTableMoveStmt) error {
return domain.GetDomain(e.ctx).DDL().AlterTablePartitionsMoveEngine(e.ctx, s)
func (e *DDLExec) executeAlterTablePartitionsMoveEngine(s *ast.AlterTablePartitionsAutoActionStmt) error {
return domain.GetDomain(e.ctx).DDL().AlterTablePartitionsAutoAction(e.ctx, s)
}
41 changes: 41 additions & 0 deletions executor/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1617,3 +1617,44 @@ func (s *testRecoverTable) TestRenameMultiTables(c *C) {
tk.MustExec("drop database rename2")
tk.MustExec("drop database rename3")
}

func (s *testSuite6) TestTablePartitionAutoAction(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int) partition by range(a) INTERVAL 1000 (PARTITION p0 VALUES LESS THAN (1000));")

tbl, err := domain.GetDomain(tk.Se).InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
tbInfo := tbl.Meta()
pi := tbInfo.GetPartitionInfo()
c.Assert(pi, NotNil)
c.Assert(pi.Interval.Enable, IsTrue)
c.Assert(pi.Interval.AutoIntervalValue, Equals, int64(1000))
c.Assert(pi.Interval.AutoIntervalUnit, Equals, "")

tk.MustExec("ALTER TABLE t MOVE PARTITIONS VALUES LESS THAN 1000 TO ENGINE AWS_S3;")
tbl, err = domain.GetDomain(tk.Se).InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
tbInfo = tbl.Meta()
pi = tbInfo.GetPartitionInfo()
c.Assert(pi, NotNil)
c.Assert(pi.Interval.Enable, IsTrue)
c.Assert(pi.Interval.AutoIntervalValue, Equals, int64(1000))
c.Assert(pi.Interval.AutoIntervalUnit, Equals, "")
c.Assert(pi.AutoAction.MovePartitionExpr, Equals, "1000")
c.Assert(pi.AutoAction.MoveToEngine, Equals, "aws_s3")

tk.MustExec("ALTER TABLE t DELETE PARTITIONS VALUES LESS THAN CAST(UNIX_TIMESTAMP(DATE_SUB(NOW(), interval 1 YEAR)) as SIGNED) ;")
tbl, err = domain.GetDomain(tk.Se).InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
tbInfo = tbl.Meta()
pi = tbInfo.GetPartitionInfo()
c.Assert(pi, NotNil)
c.Assert(pi.Interval.Enable, IsTrue)
c.Assert(pi.Interval.AutoIntervalValue, Equals, int64(1000))
c.Assert(pi.Interval.AutoIntervalUnit, Equals, "")
c.Assert(pi.AutoAction.MovePartitionExpr, Equals, "1000")
c.Assert(pi.AutoAction.MoveToEngine, Equals, "aws_s3")
c.Assert(pi.AutoAction.DeletePartitionExpr, Equals, "CAST(UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 1 YEAR)) AS SIGNED)")
}
Loading

0 comments on commit ed3dee3

Please sign in to comment.