Skip to content

Commit

Permalink
Merge branch 'master' into fk-test-6
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei authored Dec 21, 2022
2 parents 9ba0d2f + 51cce45 commit 1795f56
Show file tree
Hide file tree
Showing 60 changed files with 984 additions and 261 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -4467,8 +4467,8 @@ def go_deps():
name = "org_golang_x_time",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/time",
sum = "h1:52I/1L54xyEQAYdtcSuxtiT84KGYTBGXwayxmIpNJhE=",
version = "v0.2.0",
sum = "h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=",
version = "v0.3.0",
)
go_repository(
name = "org_golang_x_tools",
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func collectGeneratedColumns(se *session, meta *model.TableInfo, cols []*table.C
var genCols []genCol
for i, col := range cols {
if col.GeneratedExpr != nil {
expr, err := expression.RewriteAstExpr(se, col.GeneratedExpr, schema, names)
expr, err := expression.RewriteAstExpr(se, col.GeneratedExpr, schema, names, false)
if err != nil {
return nil, err
}
Expand Down
7 changes: 1 addition & 6 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,12 +807,7 @@ func (b *backfillScheduler) initCopReqSenderPool() {
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err))
return
}
ver, err := sessCtx.GetStore().CurrentVersion(kv.GlobalTxnScope)
if err != nil {
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err))
return
}
b.copReqSenderPool = newCopReqSenderPool(b.ctx, copCtx, ver.Ver)
b.copReqSenderPool = newCopReqSenderPool(b.ctx, copCtx, sessCtx.GetStore())
}

func (b *backfillScheduler) canSkipError(err error) bool {
Expand Down
6 changes: 3 additions & 3 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6172,7 +6172,7 @@ func (d *ddl) CreatePrimaryKey(ctx sessionctx.Context, ti ast.Ident, indexName m
// After DDL job is put to the queue, and if the check fail, TiDB will run the DDL cancel logic.
// The recover step causes DDL wait a few seconds, makes the unit test painfully slow.
// For same reason, decide whether index is global here.
indexColumns, err := buildIndexColumns(ctx, tblInfo.Columns, indexPartSpecifications)
indexColumns, _, err := buildIndexColumns(ctx, tblInfo.Columns, indexPartSpecifications)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -6282,7 +6282,7 @@ func BuildHiddenColumnInfo(ctx sessionctx.Context, indexPartSpecifications []*as
if err != nil {
return nil, errors.Trace(err)
}
expr, err := expression.RewriteSimpleExprWithTableInfo(ctx, tblInfo, idxPart.Expr)
expr, err := expression.RewriteSimpleExprWithTableInfo(ctx, tblInfo, idxPart.Expr, true)
if err != nil {
// TODO: refine the error message.
return nil, err
Expand Down Expand Up @@ -6397,7 +6397,7 @@ func (d *ddl) createIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast.Inde
// After DDL job is put to the queue, and if the check fail, TiDB will run the DDL cancel logic.
// The recover step causes DDL wait a few seconds, makes the unit test painfully slow.
// For same reason, decide whether index is global here.
indexColumns, err := buildIndexColumns(ctx, finalColumns, indexPartSpecifications)
indexColumns, _, err := buildIndexColumns(ctx, finalColumns, indexPartSpecifications)
if err != nil {
return errors.Trace(err)
}
Expand Down
38 changes: 34 additions & 4 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,16 +536,20 @@ func cleanMDLInfo(pool *sessionPool, jobID int64, ec *clientv3.Client) {
}

// checkMDLInfo checks if metadata lock info exists. It means the schema is locked by some TiDBs if exists.
func checkMDLInfo(jobID int64, pool *sessionPool) (bool, error) {
sql := fmt.Sprintf("select * from mysql.tidb_mdl_info where job_id = %d", jobID)
func checkMDLInfo(jobID int64, pool *sessionPool) (bool, int64, error) {
sql := fmt.Sprintf("select version from mysql.tidb_mdl_info where job_id = %d", jobID)
sctx, _ := pool.get()
defer pool.put(sctx)
sess := newSession(sctx)
rows, err := sess.execute(context.Background(), sql, "check-mdl-info")
if err != nil {
return false, err
return false, 0, err
}
return len(rows) > 0, nil
if len(rows) == 0 {
return false, 0, nil
}
ver := rows[0].GetInt64(0)
return true, ver, nil
}

func needUpdateRawArgs(job *model.Job, meetErr bool) bool {
Expand Down Expand Up @@ -1377,6 +1381,32 @@ func waitSchemaChanged(ctx context.Context, d *ddlCtx, waitTime time.Duration, l
zap.String("job", job.String()))
}

// waitSchemaSyncedForMDL likes waitSchemaSynced, but it waits for getting the metadata lock of the latest version of this DDL.
func waitSchemaSyncedForMDL(d *ddlCtx, job *model.Job, latestSchemaVersion int64) error {
failpoint.Inject("checkDownBeforeUpdateGlobalVersion", func(val failpoint.Value) {
if val.(bool) {
if mockDDLErrOnce > 0 && mockDDLErrOnce != latestSchemaVersion {
panic("check down before update global version failed")
} else {
mockDDLErrOnce = -1
}
}
})

timeStart := time.Now()
// OwnerCheckAllVersions returns only when all TiDB schemas are synced(exclude the isolated TiDB).
err := d.schemaSyncer.OwnerCheckAllVersions(context.Background(), job.ID, latestSchemaVersion)
if err != nil {
logutil.Logger(d.ctx).Info("[ddl] wait latest schema version encounter error", zap.Int64("ver", latestSchemaVersion), zap.Error(err))
return err
}
logutil.Logger(d.ctx).Info("[ddl] wait latest schema version changed(get the metadata lock if tidb_enable_metadata_lock is true)",
zap.Int64("ver", latestSchemaVersion),
zap.Duration("take time", time.Since(timeStart)),
zap.String("job", job.String()))
return nil
}

// waitSchemaSynced handles the following situation:
// If the job enters a new state, and the worker crashs when it's in the process of waiting for 2 * lease time,
// Then the worker restarts quickly, we may run the job immediately again,
Expand Down
4 changes: 2 additions & 2 deletions ddl/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ func SetBatchInsertDeleteRangeSize(i int) {

var NewCopContext4Test = newCopContext

func FetchRowsFromCop4Test(copCtx *copContext, startKey, endKey kv.Key, startTS uint64,
func FetchRowsFromCop4Test(copCtx *copContext, startKey, endKey kv.Key, store kv.Storage,
batchSize int) ([]*indexRecord, bool, error) {
variable.SetDDLReorgBatchSize(int32(batchSize))
task := &reorgBackfillTask{
id: 1,
startKey: startKey,
endKey: endKey,
}
pool := newCopReqSenderPool(context.Background(), copCtx, startTS)
pool := newCopReqSenderPool(context.Background(), copCtx, store)
pool.adjustSize(1)
pool.tasksCh <- task
idxRec, _, _, done, err := pool.fetchRowColValsFromCop(*task)
Expand Down
18 changes: 18 additions & 0 deletions ddl/fktest/foreign_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,24 @@ func TestCreateTableWithForeignKeyPrivilegeCheck(t *testing.T) {
tk2.MustExec("create table t4 (a int, foreign key fk(a) references t1(id), foreign key (a) references t3(id));")
}

func TestAlterTableWithForeignKeyPrivilegeCheck(t *testing.T) {
store, _ := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create user 'u1'@'%' identified by '';")
tk.MustExec("grant create,alter on *.* to 'u1'@'%';")
tk.MustExec("create table t1 (id int key);")
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk2.Session().Auth(&auth.UserIdentity{Username: "u1", Hostname: "localhost", CurrentUser: true, AuthUsername: "u1", AuthHostname: "%"}, nil, []byte("012345678901234567890"))
tk2.MustExec("create table t2 (a int)")
err := tk2.ExecToErr("alter table t2 add foreign key (a) references t1 (id) on update cascade")
require.Error(t, err)
require.Equal(t, "[planner:1142]REFERENCES command denied to user 'u1'@'%' for table 't1'", err.Error())
tk.MustExec("grant references on test.t1 to 'u1'@'%';")
tk2.MustExec("alter table t2 add foreign key (a) references t1 (id) on update cascade")
}

func TestRenameTableWithForeignKeyMetaInfo(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
Expand Down
24 changes: 18 additions & 6 deletions ddl/generated_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,14 @@ func checkModifyGeneratedColumn(sctx sessionctx.Context, tbl table.Table, oldCol
}

type illegalFunctionChecker struct {
hasIllegalFunc bool
hasAggFunc bool
hasRowVal bool // hasRowVal checks whether the functional index refers to a row value
hasWindowFunc bool
hasNotGAFunc4ExprIdx bool
otherErr error
hasIllegalFunc bool
hasAggFunc bool
hasRowVal bool // hasRowVal checks whether the functional index refers to a row value
hasWindowFunc bool
hasNotGAFunc4ExprIdx bool
hasCastArrayFunc bool
disallowCastArrayFunc bool
otherErr error
}

func (c *illegalFunctionChecker) Enter(inNode ast.Node) (outNode ast.Node, skipChildren bool) {
Expand Down Expand Up @@ -308,7 +310,14 @@ func (c *illegalFunctionChecker) Enter(inNode ast.Node) (outNode ast.Node, skipC
case *ast.WindowFuncExpr:
c.hasWindowFunc = true
return inNode, true
case *ast.FuncCastExpr:
c.hasCastArrayFunc = c.hasCastArrayFunc || node.Tp.IsArray()
if c.disallowCastArrayFunc && node.Tp.IsArray() {
c.otherErr = expression.ErrNotSupportedYet.GenWithStackByArgs("Use of CAST( .. AS .. ARRAY) outside of functional index in CREATE(non-SELECT)/ALTER TABLE or in general expressions")
return inNode, true
}
}
c.disallowCastArrayFunc = true
return inNode, false
}

Expand Down Expand Up @@ -355,6 +364,9 @@ func checkIllegalFn4Generated(name string, genType int, expr ast.ExprNode) error
if genType == typeIndex && c.hasNotGAFunc4ExprIdx && !config.GetGlobalConfig().Experimental.AllowsExpressionIndex {
return dbterror.ErrUnsupportedExpressionIndex
}
if genType == typeColumn && c.hasCastArrayFunc {
return expression.ErrNotSupportedYet.GenWithStackByArgs("Use of CAST( .. AS .. ARRAY) outside of functional index in CREATE(non-SELECT)/ALTER TABLE or in general expressions")
}
return nil
}

Expand Down
21 changes: 12 additions & 9 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,26 +64,28 @@ var (
telemetryAddIndexIngestUsage = metrics.TelemetryAddIndexIngestCnt
)

func buildIndexColumns(ctx sessionctx.Context, columns []*model.ColumnInfo, indexPartSpecifications []*ast.IndexPartSpecification) ([]*model.IndexColumn, error) {
func buildIndexColumns(ctx sessionctx.Context, columns []*model.ColumnInfo, indexPartSpecifications []*ast.IndexPartSpecification) ([]*model.IndexColumn, bool, error) {
// Build offsets.
idxParts := make([]*model.IndexColumn, 0, len(indexPartSpecifications))
var col *model.ColumnInfo
var mvIndex bool
maxIndexLength := config.GetGlobalConfig().MaxIndexLength
// The sum of length of all index columns.
sumLength := 0
for _, ip := range indexPartSpecifications {
col = model.FindColumnInfo(columns, ip.Column.Name.L)
if col == nil {
return nil, dbterror.ErrKeyColumnDoesNotExits.GenWithStack("column does not exist: %s", ip.Column.Name)
return nil, false, dbterror.ErrKeyColumnDoesNotExits.GenWithStack("column does not exist: %s", ip.Column.Name)
}

if err := checkIndexColumn(ctx, col, ip.Length); err != nil {
return nil, err
return nil, false, err
}
mvIndex = mvIndex || col.FieldType.IsArray()
indexColLen := ip.Length
indexColumnLength, err := getIndexColumnLength(col, ip.Length)
if err != nil {
return nil, err
return nil, false, err
}
sumLength += indexColumnLength

Expand All @@ -92,12 +94,12 @@ func buildIndexColumns(ctx sessionctx.Context, columns []*model.ColumnInfo, inde
// The multiple column index and the unique index in which the length sum exceeds the maximum size
// will return an error instead produce a warning.
if ctx == nil || ctx.GetSessionVars().StrictSQLMode || mysql.HasUniKeyFlag(col.GetFlag()) || len(indexPartSpecifications) > 1 {
return nil, dbterror.ErrTooLongKey.GenWithStackByArgs(maxIndexLength)
return nil, false, dbterror.ErrTooLongKey.GenWithStackByArgs(maxIndexLength)
}
// truncate index length and produce warning message in non-restrict sql mode.
colLenPerUint, err := getIndexColumnLength(col, 1)
if err != nil {
return nil, err
return nil, false, err
}
indexColLen = maxIndexLength / colLenPerUint
// produce warning message
Expand All @@ -111,7 +113,7 @@ func buildIndexColumns(ctx sessionctx.Context, columns []*model.ColumnInfo, inde
})
}

return idxParts, nil
return idxParts, mvIndex, nil
}

// CheckPKOnGeneratedColumn checks the specification of PK is valid.
Expand Down Expand Up @@ -154,7 +156,7 @@ func checkIndexColumn(ctx sessionctx.Context, col *model.ColumnInfo, indexColumn
}

// JSON column cannot index.
if col.FieldType.GetType() == mysql.TypeJSON {
if col.FieldType.GetType() == mysql.TypeJSON && !col.FieldType.IsArray() {
if col.Hidden {
return dbterror.ErrFunctionalIndexOnJSONOrGeometryFunction
}
Expand Down Expand Up @@ -263,7 +265,7 @@ func BuildIndexInfo(
return nil, errors.Trace(err)
}

idxColumns, err := buildIndexColumns(ctx, allTableColumns, indexPartSpecifications)
idxColumns, mvIndex, err := buildIndexColumns(ctx, allTableColumns, indexPartSpecifications)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -276,6 +278,7 @@ func BuildIndexInfo(
Primary: isPrimary,
Unique: isUnique,
Global: isGlobal,
MVIndex: mvIndex,
}

if indexOption != nil {
Expand Down
17 changes: 11 additions & 6 deletions ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ type copReqSenderPool struct {
resultsCh chan idxRecResult
results generic.SyncMap[int, struct{}]

ctx context.Context
copCtx *copContext
startTS uint64
ctx context.Context
copCtx *copContext
store kv.Storage

senders []*copReqSender
wg sync.WaitGroup
Expand Down Expand Up @@ -139,7 +139,12 @@ func (c *copReqSender) run() {
curTaskID = task.id
logutil.BgLogger().Info("[ddl-ingest] start a cop-request task",
zap.Int("id", task.id), zap.String("task", task.String()))
rs, err := p.copCtx.buildTableScan(p.ctx, p.startTS, task.startKey, task.excludedEndKey())
ver, err := p.store.CurrentVersion(kv.GlobalTxnScope)
if err != nil {
p.resultsCh <- idxRecResult{id: task.id, err: err}
return
}
rs, err := p.copCtx.buildTableScan(p.ctx, ver.Ver, task.startKey, task.excludedEndKey())
if err != nil {
p.resultsCh <- idxRecResult{id: task.id, err: err}
return
Expand Down Expand Up @@ -167,7 +172,7 @@ func (c *copReqSender) run() {
}
}

func newCopReqSenderPool(ctx context.Context, copCtx *copContext, startTS uint64) *copReqSenderPool {
func newCopReqSenderPool(ctx context.Context, copCtx *copContext, store kv.Storage) *copReqSenderPool {
poolSize := copReadChunkPoolSize()
idxBufPool := make(chan []*indexRecord, poolSize)
srcChkPool := make(chan *chunk.Chunk, poolSize)
Expand All @@ -181,7 +186,7 @@ func newCopReqSenderPool(ctx context.Context, copCtx *copContext, startTS uint64
results: generic.NewSyncMap[int, struct{}](10),
ctx: ctx,
copCtx: copCtx,
startTS: startTS,
store: store,
senders: make([]*copReqSender, 0, variable.GetDDLReorgWorkerCounter()),
wg: sync.WaitGroup{},
idxBufPool: idxBufPool,
Expand Down
2 changes: 1 addition & 1 deletion ddl/index_cop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestAddIndexFetchRowsFromCoprocessor(t *testing.T) {
endKey := startKey.PrefixNext()
txn, err := store.Begin()
require.NoError(t, err)
idxRec, done, err := ddl.FetchRowsFromCop4Test(copCtx, startKey, endKey, txn.StartTS(), 10)
idxRec, done, err := ddl.FetchRowsFromCop4Test(copCtx, startKey, endKey, store, 10)
require.NoError(t, err)
require.False(t, done)
require.NoError(t, txn.Rollback())
Expand Down
6 changes: 2 additions & 4 deletions ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
// check if this ddl job is synced to all servers.
if !d.isSynced(job) || d.once.Load() {
if variable.EnableMDL.Load() {
exist, err := checkMDLInfo(job.ID, d.sessPool)
exist, version, err := checkMDLInfo(job.ID, d.sessPool)
if err != nil {
logutil.BgLogger().Warn("[ddl] check MDL info failed", zap.Error(err), zap.String("job", job.String()))
// Release the worker resource.
Expand All @@ -246,10 +246,8 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
} else if exist {
// Release the worker resource.
pool.put(wk)
err = waitSchemaSynced(d.ddlCtx, job, 2*d.lease)
err = waitSchemaSyncedForMDL(d.ddlCtx, job, version)
if err != nil {
logutil.BgLogger().Warn("[ddl] wait ddl job sync failed", zap.Error(err), zap.String("job", job.String()))
time.Sleep(time.Second)
return
}
d.once.Store(false)
Expand Down
2 changes: 1 addition & 1 deletion ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1375,7 +1375,7 @@ func checkPartitionFuncType(ctx sessionctx.Context, expr ast.ExprNode, tblInfo *
return nil
}

e, err := expression.RewriteSimpleExprWithTableInfo(ctx, tblInfo, expr)
e, err := expression.RewriteSimpleExprWithTableInfo(ctx, tblInfo, expr, false)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion docs/design/2020-08-04-global-index.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ In TiDB, operators in the partitioned table will be translated to UnionAll in th

## Compatibility

MySQL does not support global index, which means this feature may cause some compatibility issues. We add an option `enable_global_index` in `config.Config` to control it. The default value of this option is `false`, so TiDB will keep consistent with MySQL, unless the user open global index feature manually.
MySQL does not support global index, which means this feature may cause some compatibility issues. We add an option `enable-global-index` in `config.Config` to control it. The default value of this option is `false`, so TiDB will keep consistent with MySQL, unless the user open global index feature manually.

## Implementation

Expand Down
2 changes: 1 addition & 1 deletion executor/analyzetest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ go_test(
"main_test.go",
],
flaky = True,
race = "on",
shard_count = 50,
deps = [
"//domain",
Expand All @@ -30,6 +29,7 @@ go_test(
"//tablecodec",
"//testkit",
"//types",
"//util",
"//util/codec",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
Loading

0 comments on commit 1795f56

Please sign in to comment.