Skip to content

Commit

Permalink
Merge branch 'master' into issue-24109
Browse files Browse the repository at this point in the history
  • Loading branch information
Howie59 committed May 6, 2021
2 parents af095bd + 9d01a36 commit dd653c8
Show file tree
Hide file tree
Showing 189 changed files with 5,513 additions and 1,741 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# TiDB Changelog
All notable changes to this project will be documented in this file. See also [Release Notes](https://github.com/pingcap/docs/blob/master/releases/rn.md), [TiKV Changelog](https://github.com/tikv/tikv/blob/master/CHANGELOG.md) and [PD Changelog](https://github.com/tikv/pd/blob/master/CHANGELOG.md).
All notable changes to this project will be documented in this file. See also [Release Notes](https://github.com/pingcap/docs/blob/master/releases/release-notes.md), [TiKV Changelog](https://github.com/tikv/tikv/blob/master/CHANGELOG.md) and [PD Changelog](https://github.com/tikv/pd/blob/master/CHANGELOG.md).

## [3.0.4] 2019-10-08
## New features
Expand Down
2 changes: 1 addition & 1 deletion ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func newBackfillWorker(sessCtx sessionctx.Context, worker *worker, id int, t tab
sessCtx: sessCtx,
taskCh: make(chan *reorgBackfillTask, 1),
resultCh: make(chan *backfillResult, 1),
priority: tikvstore.PriorityLow,
priority: kv.PriorityLow,
}
}

Expand Down
124 changes: 123 additions & 1 deletion ddl/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,17 @@ package ddl

import (
"context"
"fmt"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

// Interceptor is used for DDL.
Expand All @@ -39,7 +46,7 @@ func (bi *BaseInterceptor) OnGetInfoSchema(ctx sessionctx.Context, is infoschema
type Callback interface {
// OnChanged is called after a ddl statement is finished.
OnChanged(err error) error
// OnSchemaStateChange is called after a schema state is changed.
// OnSchemaStateChanged is called after a schema state is changed.
OnSchemaStateChanged()
// OnJobRunBefore is called before running job.
OnJobRunBefore(job *model.Job)
Expand Down Expand Up @@ -77,3 +84,118 @@ func (c *BaseCallback) OnJobUpdated(job *model.Job) {
func (c *BaseCallback) OnWatched(ctx context.Context) {
// Nothing to do.
}

// DomainReloader is used to avoid import loop.
type DomainReloader interface {
Reload() error
}

// ****************************** Start of Customized DDL Callback Instance ****************************************

// DefaultCallback is the default callback that TiDB will use.
type DefaultCallback struct {
*BaseCallback
do DomainReloader
}

// OnChanged overrides ddl Callback interface.
func (c *DefaultCallback) OnChanged(err error) error {
if err != nil {
return err
}
logutil.BgLogger().Info("performing DDL change, must reload")

err = c.do.Reload()
if err != nil {
logutil.BgLogger().Error("performing DDL change failed", zap.Error(err))
}

return nil
}

// OnSchemaStateChanged overrides the ddl Callback interface.
func (c *DefaultCallback) OnSchemaStateChanged() {
err := c.do.Reload()
if err != nil {
logutil.BgLogger().Error("domain callback failed on schema state changed", zap.Error(err))
}
}

func newDefaultCallBack(do DomainReloader) Callback {
return &DefaultCallback{do: do}
}

// ****************************** End of Default DDL Callback Instance *********************************************

// ****************************** Start of CTC DDL Callback Instance ***********************************************

// ctcCallback is the customized callback that TiDB will use.
// ctc is named from column type change, here after we call them ctc for short.
type ctcCallback struct {
*BaseCallback
do DomainReloader
}

// OnChanged overrides ddl Callback interface.
func (c *ctcCallback) OnChanged(err error) error {
if err != nil {
return err
}
logutil.BgLogger().Info("performing DDL change, must reload")

err = c.do.Reload()
if err != nil {
logutil.BgLogger().Error("performing DDL change failed", zap.Error(err))
}
return nil
}

// OnSchemaStateChanged overrides the ddl Callback interface.
func (c *ctcCallback) OnSchemaStateChanged() {
err := c.do.Reload()
if err != nil {
logutil.BgLogger().Error("domain callback failed on schema state changed", zap.Error(err))
}
}

// OnJobRunBefore is used to run the user customized logic of `onJobRunBefore` first.
func (c *ctcCallback) OnJobRunBefore(job *model.Job) {
log.Info("on job run before", zap.String("job", job.String()))
// Only block the ctc type ddl here.
if job.Type != model.ActionModifyColumn {
return
}
switch job.SchemaState {
case model.StateDeleteOnly, model.StateWriteOnly, model.StateWriteReorganization:
logutil.BgLogger().Warn(fmt.Sprintf("[DDL_HOOK] Hang for 0.5 seconds on %s state triggered", job.SchemaState.String()))
time.Sleep(500 * time.Millisecond)
}
}

func newCTCCallBack(do DomainReloader) Callback {
return &ctcCallback{do: do}
}

// ****************************** End of CTC DDL Callback Instance ***************************************************

var (
customizedCallBackRegisterMap = map[string]func(do DomainReloader) Callback{}
)

func init() {
// init the callback register map.
customizedCallBackRegisterMap["default_hook"] = newDefaultCallBack
customizedCallBackRegisterMap["ctc_hook"] = newCTCCallBack
}

// GetCustomizedHook get the hook registered in the hookMap.
func GetCustomizedHook(s string) (func(do DomainReloader) Callback, error) {
s = strings.ToLower(s)
s = strings.TrimSpace(s)
fact, ok := customizedCallBackRegisterMap[s]
if !ok {
logutil.BgLogger().Error("bad ddl hook " + s)
return nil, errors.Errorf("ddl hook `%s` is not found in hook registered map", s)
}
return fact, nil
}
32 changes: 24 additions & 8 deletions ddl/callback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ import (
"context"

. "github.com/pingcap/check"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

Expand All @@ -38,6 +38,7 @@ func (ti *TestInterceptor) OnGetInfoSchema(ctx sessionctx.Context, is infoschema
return ti.BaseInterceptor.OnGetInfoSchema(ctx, is)
}

// TestDDLCallback is used to customize user callback themselves.
type TestDDLCallback struct {
*BaseCallback
// We recommended to pass the domain parameter to the test ddl callback, it will ensure
Expand All @@ -51,16 +52,33 @@ type TestDDLCallback struct {
onWatched func(ctx context.Context)
}

// OnChanged mock the same behavior with the main DDL hook.
func (tc *TestDDLCallback) OnChanged(err error) error {
if err != nil {
return err
}
logutil.BgLogger().Info("performing DDL change, must reload")
if tc.Do != nil {
err = tc.Do.Reload()
if err != nil {
logutil.BgLogger().Error("performing DDL change failed", zap.Error(err))
}
}
return nil
}

// OnSchemaStateChanged mock the same behavior with the main ddl hook.
func (tc *TestDDLCallback) OnSchemaStateChanged() {
if tc.Do != nil {
if err := tc.Do.Reload(); err != nil {
log.Warn("reload failed on schema state changed", zap.Error(err))
logutil.BgLogger().Warn("reload failed on schema state changed", zap.Error(err))
}
}
}

// OnJobRunBefore is used to run the user customized logic of `onJobRunBefore` first.
func (tc *TestDDLCallback) OnJobRunBefore(job *model.Job) {
log.Info("on job run before", zap.String("job", job.String()))
logutil.BgLogger().Info("on job run before", zap.String("job", job.String()))
if tc.OnJobRunBeforeExported != nil {
tc.OnJobRunBeforeExported(job)
return
Expand All @@ -73,8 +91,9 @@ func (tc *TestDDLCallback) OnJobRunBefore(job *model.Job) {
tc.BaseCallback.OnJobRunBefore(job)
}

// OnJobUpdated is used to run the user customized logic of `OnJobUpdated` first.
func (tc *TestDDLCallback) OnJobUpdated(job *model.Job) {
log.Info("on job updated", zap.String("job", job.String()))
logutil.BgLogger().Info("on job updated", zap.String("job", job.String()))
if tc.OnJobUpdatedExported != nil {
tc.OnJobUpdatedExported(job)
return
Expand All @@ -87,6 +106,7 @@ func (tc *TestDDLCallback) OnJobUpdated(job *model.Job) {
tc.BaseCallback.OnJobUpdated(job)
}

// OnWatched is used to run the user customized logic of `OnWatched` first.
func (tc *TestDDLCallback) OnWatched(ctx context.Context) {
if tc.onWatched != nil {
tc.onWatched(ctx)
Expand All @@ -103,7 +123,3 @@ func (s *testDDLSuite) TestCallback(c *C) {
cb.OnJobUpdated(nil)
cb.OnWatched(context.TODO())
}

type DomainReloader interface {
Reload() error
}
2 changes: 1 addition & 1 deletion ddl/column_type_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (s *testColumnTypeChangeSuite) TestRollbackColumnTypeChangeBetweenInteger(c
SQL := "alter table t modify column c2 int not null"
_, err := tk.Exec(SQL)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:1]MockRollingBackInCallBack-none")
c.Assert(err.Error(), Equals, "[ddl:1]MockRollingBackInCallBack-queueing")
assertRollBackedColUnchanged(c, tk)

// Mock roll back at model.StateDeleteOnly.
Expand Down
9 changes: 8 additions & 1 deletion ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,12 @@ create table log_message_1 (

tk.MustExec("drop table if exists t;")
tk.MustExec(`create table t(a int) partition by range (a) (partition p0 values less than (18446744073709551615));`)

tk.MustExec("drop table if exists t;")
tk.MustExec(`create table t(a binary) partition by range columns (a) (partition p0 values less than (X'0C'));`)
tk.MustExec(`alter table t add partition (partition p1 values less than (X'0D'), partition p2 values less than (X'0E'));`)
tk.MustExec(`insert into t values (X'0B'), (X'0C'), (X'0D')`)
tk.MustQuery(`select * from t where a < X'0D'`).Check(testkit.Rows("\x0B", "\x0C"))
}

func (s *testIntegrationSuite1) TestDisableTablePartition(c *C) {
Expand Down Expand Up @@ -743,6 +749,7 @@ func (s *testIntegrationSuite1) TestCreateTableWithListPartition(c *C) {
"create table t (a bigint) partition by list (a) (partition p0 values in (to_seconds('2020-09-28 17:03:38'),to_seconds('2020-09-28 17:03:39')));",
"create table t (a datetime) partition by list (to_seconds(a)) (partition p0 values in (to_seconds('2020-09-28 17:03:38'),to_seconds('2020-09-28 17:03:39')));",
"create table t (a int, b int generated always as (a+1) virtual) partition by list (b + 1) (partition p0 values in (1));",
"create table t(a binary) partition by list columns (a) (partition p0 values in (X'0C'));",
s.generatePartitionTableByNum(ddl.PartitionCountLimit),
}

Expand Down Expand Up @@ -3334,7 +3341,7 @@ func (s *testIntegrationSuite7) TestAddPartitionForTableWithWrongType(c *C) {

_, err = tk.Exec("alter table t_char add partition (partition p1 values less than (0x20))")
c.Assert(err, NotNil)
c.Assert(ddl.ErrWrongTypeColumnValue.Equal(err), IsTrue)
c.Assert(ddl.ErrRangeNotIncreasing.Equal(err), IsTrue)

_, err = tk.Exec("alter table t_char add partition (partition p1 values less than (10))")
c.Assert(err, NotNil)
Expand Down
31 changes: 31 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ var _ = Suite(&testDBSuite5{&testDBSuite{}})
var _ = Suite(&testDBSuite6{&testDBSuite{}})
var _ = Suite(&testDBSuite7{&testDBSuite{}})
var _ = SerialSuites(&testSerialDBSuite{&testDBSuite{}})
var _ = Suite(&testDBSuite8{&testDBSuite{}})

const defaultBatchSize = 1024
const defaultReorgBatchSize = 256
Expand Down Expand Up @@ -145,6 +146,7 @@ type testDBSuite5 struct{ *testDBSuite }
type testDBSuite6 struct{ *testDBSuite }
type testDBSuite7 struct{ *testDBSuite }
type testSerialDBSuite struct{ *testDBSuite }
type testDBSuite8 struct{ *testDBSuite }

func testAddIndexWithPK(tk *testkit.TestKit) {
tk.MustExec("drop table if exists test_add_index_with_pk")
Expand Down Expand Up @@ -6700,3 +6702,32 @@ func (s *testSerialDBSuite) TestJsonUnmarshalErrWhenPanicInCancellingPath(c *C)
_, err := tk.Exec("alter table test_add_index_after_add_col add unique index cc(c);")
c.Assert(err.Error(), Equals, "[kv:1062]Duplicate entry '0' for key 'cc'")
}

// For Close issue #24288
// see https://github.com/pingcap/tidb/issues/24288
func (s *testDBSuite8) TestDdlMaxLimitOfIdentifier(c *C) {
tk := testkit.NewTestKit(c, s.store)

// create/drop database test
longDbName := strings.Repeat("库", mysql.MaxDatabaseNameLength-1)
tk.MustExec(fmt.Sprintf("create database %s", longDbName))
defer func() {
tk.MustExec(fmt.Sprintf("drop database %s", longDbName))
}()
tk.MustExec(fmt.Sprintf("use %s", longDbName))

// create/drop table,index test
longTblName := strings.Repeat("表", mysql.MaxTableNameLength-1)
longColName := strings.Repeat("三", mysql.MaxColumnNameLength-1)
longIdxName := strings.Repeat("索", mysql.MaxIndexIdentifierLen-1)
tk.MustExec(fmt.Sprintf("create table %s(f1 int primary key,f2 int, %s varchar(50))", longTblName, longColName))
tk.MustExec(fmt.Sprintf("create index %s on %s(%s)", longIdxName, longTblName, longColName))
defer func() {
tk.MustExec(fmt.Sprintf("drop index %s on %s", longIdxName, longTblName))
tk.MustExec(fmt.Sprintf("drop table %s", longTblName))
}()

// alter table
tk.MustExec(fmt.Sprintf("alter table %s change f2 %s int", longTblName, strings.Repeat("二", mysql.MaxColumnNameLength-1)))

}
12 changes: 11 additions & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,14 @@ type DDL interface {
OwnerManager() owner.Manager
// GetID gets the ddl ID.
GetID() string
// GetTableMaxRowID gets the max row ID of a normal table or a partition.
// GetTableMaxHandle gets the max row ID of a normal table or a partition.
GetTableMaxHandle(startTS uint64, tbl table.PhysicalTable) (kv.Handle, bool, error)
// SetBinlogClient sets the binlog client for DDL worker. It's exported for testing.
SetBinlogClient(*pumpcli.PumpsClient)
// GetHook gets the hook. It's exported for testing.
GetHook() Callback
// SetHook sets the hook.
SetHook(h Callback)
}

type limitJobTask struct {
Expand Down Expand Up @@ -625,6 +627,14 @@ func (d *ddl) GetHook() Callback {
return d.mu.hook
}

// SetHook set the customized hook.
func (d *ddl) SetHook(h Callback) {
d.mu.Lock()
defer d.mu.Unlock()

d.mu.hook = h
}

func (d *ddl) startCleanDeadTableLock() {
defer func() {
goutil.Recover(metrics.LabelDDL, "startCleanDeadTableLock", nil, false)
Expand Down
Loading

0 comments on commit dd653c8

Please sign in to comment.