Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#49089
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
Leavrth authored and ti-chi-bot committed Dec 27, 2023
1 parent 4d53073 commit f10bf0e
Show file tree
Hide file tree
Showing 11 changed files with 770 additions and 170 deletions.
19 changes: 18 additions & 1 deletion br/pkg/gluetidb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"//br/pkg/glue",
"//br/pkg/gluetikv",
"//br/pkg/logutil",
<<<<<<< HEAD
"//config",
"//ddl",
"//domain",
Expand All @@ -19,6 +20,17 @@ go_library(
"//parser/mysql",
"//session",
"//sessionctx",
=======
"//pkg/config",
"//pkg/ddl",
"//pkg/domain",
"//pkg/executor",
"//pkg/kv",
"//pkg/parser/model",
"//pkg/session",
"//pkg/session/types",
"//pkg/sessionctx",
>>>>>>> 8709bb53df5 (brie: support batch ddl for sql restore (#49089))
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_log//:log",
"@com_github_tikv_pd_client//:client",
Expand All @@ -32,9 +44,9 @@ go_test(
srcs = ["glue_test.go"],
embed = [":gluetidb"],
flaky = True,
shard_count = 4,
deps = [
"//br/pkg/glue",
<<<<<<< HEAD
"//ddl",
"//kv",
"//meta",
Expand All @@ -43,6 +55,11 @@ go_test(
"//testkit",
"//types",
"@com_github_pingcap_failpoint//:failpoint",
=======
"//pkg/parser/model",
"//pkg/testkit",
"//pkg/types",
>>>>>>> 8709bb53df5 (brie: support batch ddl for sql restore (#49089))
"@com_github_stretchr_testify//require",
],
)
142 changes: 20 additions & 122 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@
package gluetidb

import (
"bytes"
"context"
<<<<<<< HEAD
"strings"
=======
"time"
>>>>>>> 8709bb53df5 (brie: support batch ddl for sql restore (#49089))

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/gluetikv"
"github.com/pingcap/tidb/br/pkg/logutil"
<<<<<<< HEAD
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
Expand All @@ -22,6 +26,17 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
=======
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/executor"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/session"
sessiontypes "github.com/pingcap/tidb/pkg/session/types"
"github.com/pingcap/tidb/pkg/sessionctx"
>>>>>>> 8709bb53df5 (brie: support batch ddl for sql restore (#49089))
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)
Expand All @@ -32,11 +47,7 @@ var (
_ glue.Glue = Glue{}
)

const (
defaultCapOfCreateTable = 512
defaultCapOfCreateDatabase = 64
brComment = `/*from(br)*/`
)
const brComment = `/*from(br)*/`

// New makes a new tidb glue.
func New() Glue {
Expand Down Expand Up @@ -201,17 +212,7 @@ func (gs *tidbSession) ExecuteInternal(ctx context.Context, sql string, args ...

// CreateDatabase implements glue.Session.
func (gs *tidbSession) CreateDatabase(ctx context.Context, schema *model.DBInfo) error {
d := domain.GetDomain(gs.se).DDL()
query, err := gs.showCreateDatabase(schema)
if err != nil {
return errors.Trace(err)
}
gs.se.SetValue(sessionctx.QueryString, query)
schema = schema.Clone()
if len(schema.Charset) == 0 {
schema.Charset = mysql.DefaultCharset
}
return d.CreateSchemaWithInfo(gs.se, schema, ddl.OnExistIgnore)
return errors.Trace(executor.BRIECreateDatabase(gs.se, schema, brComment))
}

// CreatePlacementPolicy implements glue.Session.
Expand All @@ -222,95 +223,16 @@ func (gs *tidbSession) CreatePlacementPolicy(ctx context.Context, policy *model.
return d.CreatePlacementPolicyWithInfo(gs.se, policy, ddl.OnExistIgnore)
}

// SplitBatchCreateTable provide a way to split batch into small batch when batch size is large than 6 MB.
// The raft entry has limit size of 6 MB, a batch of CreateTables may hit this limitation
// TODO: shall query string be set for each split batch create, it looks does not matter if we set once for all.
func (gs *tidbSession) SplitBatchCreateTable(schema model.CIStr,
infos []*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
var err error
d := domain.GetDomain(gs.se).DDL()
err = d.BatchCreateTableWithInfo(gs.se, schema, infos, append(cs, ddl.OnExistIgnore)...)
if kv.ErrEntryTooLarge.Equal(err) {
log.Info("entry too large, split batch create table", zap.Int("num table", len(infos)))
if len(infos) == 1 {
return err
}
mid := len(infos) / 2
err = gs.SplitBatchCreateTable(schema, infos[:mid], cs...)
if err != nil {
return err
}
err = gs.SplitBatchCreateTable(schema, infos[mid:], cs...)
if err != nil {
return err
}
return nil
}
return err
}

// CreateTables implements glue.BatchCreateTableSession.
func (gs *tidbSession) CreateTables(_ context.Context,
tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
var dbName model.CIStr

// Disable foreign key check when batch create tables.
gs.se.GetSessionVars().ForeignKeyChecks = false
for db, tablesInDB := range tables {
dbName = model.NewCIStr(db)
queryBuilder := strings.Builder{}
cloneTables := make([]*model.TableInfo, 0, len(tablesInDB))
for _, table := range tablesInDB {
query, err := gs.showCreateTable(table)
if err != nil {
return errors.Trace(err)
}

queryBuilder.WriteString(query)
queryBuilder.WriteString(";")

table = table.Clone()
// Clone() does not clone partitions yet :(
if table.Partition != nil {
newPartition := *table.Partition
newPartition.Definitions = append([]model.PartitionDefinition{}, table.Partition.Definitions...)
table.Partition = &newPartition
}
cloneTables = append(cloneTables, table)
}
gs.se.SetValue(sessionctx.QueryString, queryBuilder.String())
if err := gs.SplitBatchCreateTable(dbName, cloneTables, cs...); err != nil {
//It is possible to failure when TiDB does not support model.ActionCreateTables.
//In this circumstance, BatchCreateTableWithInfo returns errno.ErrInvalidDDLJob,
//we fall back to old way that creating table one by one
log.Warn("batch create table from tidb failure", zap.Error(err))
return err
}
}

return nil
return errors.Trace(executor.BRIECreateTables(gs.se, tables, brComment, cs...))
}

// CreateTable implements glue.Session.
func (gs *tidbSession) CreateTable(_ context.Context, dbName model.CIStr,
table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
d := domain.GetDomain(gs.se).DDL()
query, err := gs.showCreateTable(table)
if err != nil {
return errors.Trace(err)
}
gs.se.SetValue(sessionctx.QueryString, query)
// Disable foreign key check when batch create tables.
gs.se.GetSessionVars().ForeignKeyChecks = false
// Clone() does not clone partitions yet :(
table = table.Clone()
if table.Partition != nil {
newPartition := *table.Partition
newPartition.Definitions = append([]model.PartitionDefinition{}, table.Partition.Definitions...)
table.Partition = &newPartition
}

return d.CreateTableWithInfo(gs.se, dbName, table, append(cs, ddl.OnExistIgnore)...)
return errors.Trace(executor.BRIECreateTable(gs.se, dbName, table, brComment, cs...))
}

// Close implements glue.Session.
Expand All @@ -323,30 +245,6 @@ func (gs *tidbSession) GetGlobalVariable(name string) (string, error) {
return gs.se.GetSessionVars().GlobalVarsAccessor.GetTiDBTableValue(name)
}

// showCreateTable shows the result of SHOW CREATE TABLE from a TableInfo.
func (gs *tidbSession) showCreateTable(tbl *model.TableInfo) (string, error) {
table := tbl.Clone()
table.AutoIncID = 0
result := bytes.NewBuffer(make([]byte, 0, defaultCapOfCreateTable))
// this can never fail.
_, _ = result.WriteString(brComment)
if err := executor.ConstructResultOfShowCreateTable(gs.se, tbl, autoid.Allocators{}, result); err != nil {
return "", errors.Trace(err)
}
return result.String(), nil
}

// showCreateDatabase shows the result of SHOW CREATE DATABASE from a dbInfo.
func (gs *tidbSession) showCreateDatabase(db *model.DBInfo) (string, error) {
result := bytes.NewBuffer(make([]byte, 0, defaultCapOfCreateDatabase))
// this can never fail.
_, _ = result.WriteString(brComment)
if err := executor.ConstructResultOfShowCreateDatabase(gs.se, db, true, result); err != nil {
return "", errors.Trace(err)
}
return result.String(), nil
}

func (gs *tidbSession) showCreatePlacementPolicy(policy *model.PolicyInfo) string {
return executor.ConstructResultOfShowCreatePlacementPolicy(policy)
}
Expand Down
11 changes: 9 additions & 2 deletions br/pkg/gluetidb/glue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ package gluetidb

import (
"context"
"strconv"
"testing"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/glue"
<<<<<<< HEAD
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
Expand Down Expand Up @@ -216,6 +215,14 @@ func TestSplitBatchCreateTableFailWithEntryTooLarge(t *testing.T) {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/RestoreBatchCreateTableEntryTooLarge"))
}

=======
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/types"
"github.com/stretchr/testify/require"
)

>>>>>>> 8709bb53df5 (brie: support batch ddl for sql restore (#49089))
func TestTheSessionIsoation(t *testing.T) {
req := require.New(t)
store, _ := testkit.CreateMockStoreAndDomain(t)
Expand Down
47 changes: 27 additions & 20 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,24 +230,22 @@ func (rc *Client) Init(g glue.Glue, store kv.Storage) error {
rc.backupMeta = new(backuppb.BackupMeta)
}

// Only in binary we can use multi-thread sessions to create tables.
// so use OwnStorage() to tell whether we are use binary or SQL.
if g.OwnsStorage() {
// Maybe allow user modify the DDL concurrency isn't necessary,
// because executing DDL is really I/O bound (or, algorithm bound?),
// and we cost most of time at waiting DDL jobs be enqueued.
// So these jobs won't be faster or slower when machine become faster or slower,
// hence make it a fixed value would be fine.
rc.dbPool, err = makeDBPool(defaultDDLConcurrency, func() (*DB, error) {
db, _, err := NewDB(g, store, rc.policyMode)
return db, err
})
if err != nil {
log.Warn("create session pool failed, we will send DDLs only by created sessions",
zap.Error(err),
zap.Int("sessionCount", len(rc.dbPool)),
)
}
// There are different ways to create session between in binary and in SQL.
//
// Maybe allow user modify the DDL concurrency isn't necessary,
// because executing DDL is really I/O bound (or, algorithm bound?),
// and we cost most of time at waiting DDL jobs be enqueued.
// So these jobs won't be faster or slower when machine become faster or slower,
// hence make it a fixed value would be fine.
rc.dbPool, err = makeDBPool(defaultDDLConcurrency, func() (*DB, error) {
db, _, err := NewDB(g, store, rc.policyMode)
return db, err
})
if err != nil {
log.Warn("create session pool failed, we will send DDLs only by created sessions",
zap.Error(err),
zap.Int("sessionCount", len(rc.dbPool)),
)
}
return errors.Trace(err)
}
Expand Down Expand Up @@ -487,12 +485,21 @@ func (rc *Client) GetRewriteMode() RewriteMode {
return rc.rewriteMode
}

// Close a client.
func (rc *Client) Close() {
func (rc *Client) closeConn() {
// rc.db can be nil in raw kv mode.
if rc.db != nil {
rc.db.Close()
}
for _, db := range rc.dbPool {
db.Close()
}
}

// Close a client.
func (rc *Client) Close() {
// close the connection, and it must be succeed when in SQL mode.
rc.closeConn()

if rc.rawKVClient != nil {
rc.rawKVClient.Close()
}
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/restore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,9 @@ func makeDBPool(size uint, dbFactory func() (*DB, error)) ([]*DB, error) {
if e != nil {
return dbPool, e
}
dbPool = append(dbPool, db)
if db != nil {
dbPool = append(dbPool, db)
}
}
return dbPool, nil
}
Expand Down
5 changes: 5 additions & 0 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"batch_point_get.go",
"bind.go",
"brie.go",
"brie_utils.go",
"builder.go",
"calibrate_resource.go",
"change.go",
Expand Down Expand Up @@ -276,8 +277,12 @@ go_test(
"batch_point_get_test.go",
"benchmark_test.go",
"brie_test.go",
<<<<<<< HEAD:executor/BUILD.bazel
"calibrate_resource_test.go",
"charset_test.go",
=======
"brie_utils_test.go",
>>>>>>> 8709bb53df5 (brie: support batch ddl for sql restore (#49089)):pkg/executor/BUILD.bazel
"chunk_size_control_test.go",
"cluster_table_test.go",
"collation_test.go",
Expand Down
Loading

0 comments on commit f10bf0e

Please sign in to comment.