Skip to content

Commit

Permalink
infoschema: use maps.Clone to replace iterating (#54581)
Browse files Browse the repository at this point in the history
ref #54436
  • Loading branch information
D3Hunter authored Jul 12, 2024
1 parent 3a61d79 commit 0b9cd2f
Show file tree
Hide file tree
Showing 12 changed files with 66 additions and 145 deletions.
45 changes: 0 additions & 45 deletions pkg/ddl/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,29 +118,6 @@ type DefaultCallback struct {
do SchemaLoader
}

// OnChanged overrides ddl Callback interface.
func (c *DefaultCallback) OnChanged(err error) error {
if err != nil {
return err
}
logutil.DDLLogger().Info("performing DDL change, must reload")

err = c.do.Reload()
if err != nil {
logutil.DDLLogger().Error("performing DDL change failed", zap.Error(err))
}

return nil
}

// OnSchemaStateChanged overrides the ddl Callback interface.
func (c *DefaultCallback) OnSchemaStateChanged(_ int64) {
err := c.do.Reload()
if err != nil {
logutil.DDLLogger().Error("domain callback failed on schema state changed", zap.Error(err))
}
}

func newDefaultCallBack(do SchemaLoader) Callback {
return &DefaultCallback{BaseCallback: &BaseCallback{}, do: do}
}
Expand All @@ -156,28 +133,6 @@ type ctcCallback struct {
do SchemaLoader
}

// OnChanged overrides ddl Callback interface.
func (c *ctcCallback) OnChanged(err error) error {
if err != nil {
return err
}
logutil.DDLLogger().Info("performing DDL change, must reload")

err = c.do.Reload()
if err != nil {
logutil.DDLLogger().Error("performing DDL change failed", zap.Error(err))
}
return nil
}

// OnSchemaStateChanged overrides the ddl Callback interface.
func (c *ctcCallback) OnSchemaStateChanged(_ int64) {
err := c.do.Reload()
if err != nil {
logutil.DDLLogger().Error("domain callback failed on schema state changed", zap.Error(err))
}
}

// OnJobRunBefore is used to run the user customized logic of `onJobRunBefore` first.
func (*ctcCallback) OnJobRunBefore(job *model.Job) {
log.Info("on job run before", zap.String("job", job.String()))
Expand Down
13 changes: 6 additions & 7 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1160,11 +1160,6 @@ func (d *ddl) deliverJobTask(task *limitJobTask) {
}
}

func (*ddl) shouldCheckHistoryJob(job *model.Job) bool {
// for local mode job, we add the history job directly now, so no need to check it.
return !job.LocalMode
}

// DoDDLJob will return
// - nil: found in history DDL job and no job error
// - context.Cancel: job has been sent to worker, but not found in history DDL job before cancel
Expand Down Expand Up @@ -1212,8 +1207,12 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {
// Notice worker that we push a new job and wait the job done.
d.notifyNewJobSubmitted(d.ddlJobNotifyCh, addingDDLJobNotifyKey, job.ID, job.Type.String())
logutil.DDLLogger().Info("start DDL job", zap.Stringer("job", job), zap.String("query", job.Query))
if !d.shouldCheckHistoryJob(job) {
return nil

// for local mode job, we add the history job directly now, so no need to check it.
// fast-create doesn't wait schema version synced, we must reload info-schema
// here to make sure later statements can see the created table/database.
if job.LocalMode {
return d.schemaLoader.Reload()
}

var historyJob *model.Job
Expand Down
5 changes: 1 addition & 4 deletions pkg/ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1388,13 +1388,10 @@ func toTError(err error) *terror.Error {

// waitSchemaChanged waits for the completion of updating all servers' schema or MDL synced. In order to make sure that happens,
// we wait at most 2 * lease time(sessionTTL, 90 seconds).
func waitSchemaChanged(ctx context.Context, d *ddlCtx, waitTime time.Duration, latestSchemaVersion int64, job *model.Job) error {
func waitSchemaChanged(ctx context.Context, d *ddlCtx, latestSchemaVersion int64, job *model.Job) error {
if !job.IsRunning() && !job.IsRollingback() && !job.IsDone() && !job.IsRollbackDone() {
return nil
}
if waitTime == 0 {
return nil
}

timeStart := time.Now()
var err error
Expand Down
5 changes: 3 additions & 2 deletions pkg/ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ func (s *jobScheduler) runOneJobStep(wk *worker, job *model.Job) error {
return err
}
} else {
err := waitSchemaSynced(wk.ctx, s.ddlCtx, job, 2*s.lease)
err := waitSchemaSynced(wk.ctx, s.ddlCtx, job)
if err != nil {
time.Sleep(time.Second)
return err
Expand All @@ -619,10 +619,11 @@ func (s *jobScheduler) runOneJobStep(wk *worker, job *model.Job) error {
}
})

failpoint.InjectCall("beforeWaitSchemaChanged", job)
// Here means the job enters another state (delete only, write only, public, etc...) or is cancelled.
// If the job is done or still running or rolling back, we will wait 2 * lease time or util MDL synced to guarantee other servers to update
// the newest schema.
if err = waitSchemaChanged(wk.ctx, s.ddlCtx, s.lease*2, schemaVer, job); err != nil {
if err = waitSchemaChanged(wk.ctx, s.ddlCtx, schemaVer, job); err != nil {
return err
}
s.cleanMDLInfo(job, ownerID)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/schema_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ func checkAllVersions(ctx context.Context, d *ddlCtx, job *model.Job, latestSche
// but schema version might not sync.
// So here we get the latest schema version to make sure all servers' schema version
// update to the latest schema version in a cluster.
func waitSchemaSynced(ctx context.Context, d *ddlCtx, job *model.Job, waitTime time.Duration) error {
func waitSchemaSynced(ctx context.Context, d *ddlCtx, job *model.Job) error {
if !job.IsRunning() && !job.IsRollingback() && !job.IsDone() && !job.IsRollbackDone() {
return nil
}
Expand All @@ -413,5 +413,5 @@ func waitSchemaSynced(ctx context.Context, d *ddlCtx, job *model.Job, waitTime t
}
})

return waitSchemaChanged(ctx, d, waitTime, latestSchemaVersion, job)
return waitSchemaChanged(ctx, d, latestSchemaVersion, job)
}
1 change: 1 addition & 0 deletions pkg/ddl/tests/indexmerge/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_test(
"//pkg/parser/model",
"//pkg/tablecodec",
"//pkg/testkit",
"//pkg/testkit/testfailpoint",
"//pkg/testkit/testsetup",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//assert",
Expand Down
16 changes: 5 additions & 11 deletions pkg/ddl/tests/indexmerge/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -80,7 +81,6 @@ func TestAddIndexMergeProcess(t *testing.T) {
}

func TestAddPrimaryKeyMergeProcess(t *testing.T) {
// Disable auto schema reload.
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, 0)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand All @@ -94,11 +94,9 @@ func TestAddPrimaryKeyMergeProcess(t *testing.T) {

var checkErr error
var runDML, backfillDone bool
originHook := dom.DDL().GetHook()
callback := &callback.TestDDLCallback{
Do: nil, // We'll reload the schema manually.
}
onJobUpdatedExportedFunc := func(job *model.Job) {
// only trigger reload when schema version changed
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/domain/disableOnTickReload", "return(true)")
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeWaitSchemaChanged", func(job *model.Job) {
if !runDML && job.Type == model.ActionAddPrimaryKey && job.SchemaState == model.StateWriteReorganization {
idx := testutil.FindIdxInfo(dom, "test", "t", "primary")
if idx == nil || idx.BackfillState != model.BackfillStateRunning || job.SnapshotVer == 0 {
Expand All @@ -114,12 +112,8 @@ func TestAddPrimaryKeyMergeProcess(t *testing.T) {
// Add delete record 4 to the temporary index.
_, checkErr = tk2.Exec("delete from t where c1 = 4;")
}
assert.NoError(t, dom.Reload())
}
callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
dom.DDL().SetHook(callback)
})
tk.MustExec("alter table t add primary key idx(c1);")
dom.DDL().SetHook(originHook)
require.True(t, backfillDone)
require.True(t, runDML)
require.NoError(t, checkErr)
Expand Down
39 changes: 22 additions & 17 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -983,6 +983,9 @@ func (do *Domain) loadSchemaInLoop(ctx context.Context, lease time.Duration) {
for {
select {
case <-ticker.C:
failpoint.Inject("disableOnTickReload", func() {
failpoint.Continue()
})
err := do.Reload()
if err != nil {
logutil.BgLogger().Error("reload schema in loop failed", zap.Error(err))
Expand Down Expand Up @@ -1341,25 +1344,27 @@ func (do *Domain) Init(
return err
}

// Only when the store is local that the lease value is 0.
// If the store is local, it doesn't need loadSchemaInLoop.
if ddlLease > 0 {
sub := time.Since(startReloadTime)
// The reload(in step 2) operation takes more than ddlLease and a new reload operation was not performed,
// the next query will respond by ErrInfoSchemaExpired error. So we do a new reload to update schemaValidator.latestSchemaExpire.
if sub > (ddlLease / 2) {
logutil.BgLogger().Warn("loading schema and starting ddl take a long time, we do a new reload", zap.Duration("take time", sub))
err = do.Reload()
if err != nil {
return err
}
}
// TODO there are many place set ddlLease to 0, remove them completely, we want
// UT and even local uni-store to run similar code path as normal.
if ddlLease == 0 {
ddlLease = time.Second
}

// Local store needs to get the change information for every DDL state in each session.
do.wg.Run(func() {
do.loadSchemaInLoop(ctx, ddlLease)
}, "loadSchemaInLoop")
sub := time.Since(startReloadTime)
// The reload(in step 2) operation takes more than ddlLease and a new reload operation was not performed,
// the next query will respond by ErrInfoSchemaExpired error. So we do a new reload to update schemaValidator.latestSchemaExpire.
if sub > (ddlLease / 2) {
logutil.BgLogger().Warn("loading schema and starting ddl take a long time, we do a new reload", zap.Duration("take time", sub))
err = do.Reload()
if err != nil {
return err
}
}

// Local store needs to get the change information for every DDL state in each session.
do.wg.Run(func() {
do.loadSchemaInLoop(ctx, ddlLease)
}, "loadSchemaInLoop")
do.wg.Run(do.mdlCheckLoop, "mdlCheckLoop")
do.wg.Run(do.topNSlowQueryLoop, "topNSlowQueryLoop")
do.wg.Run(do.infoSyncerKeeper, "infoSyncerKeeper")
Expand Down
25 changes: 9 additions & 16 deletions pkg/infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"cmp"
"context"
"fmt"
"maps"
"slices"
"strings"

Expand Down Expand Up @@ -805,23 +806,18 @@ func (b *Builder) InitWithOldInfoSchema(oldSchema InfoSchema) (*Builder, error)
oldIS := oldSchema.base()
b.initBundleInfoBuilder()
b.infoSchema.schemaMetaVersion = oldIS.schemaMetaVersion
b.copySchemasMap(oldIS)
b.copyBundlesMap(oldIS)
b.copyPoliciesMap(oldIS)
b.copyResourceGroupMap(oldIS)
b.copyTemporaryTableIDsMap(oldIS)
b.copyReferredForeignKeyMap(oldIS)
b.infoSchema.schemaMap = maps.Clone(oldIS.schemaMap)
b.infoSchema.schemaID2Name = maps.Clone(oldIS.schemaID2Name)
b.infoSchema.ruleBundleMap = maps.Clone(oldIS.ruleBundleMap)
b.infoSchema.policyMap = oldIS.ClonePlacementPolicies()
b.infoSchema.resourceGroupMap = oldIS.CloneResourceGroups()
b.infoSchema.temporaryTableIDs = maps.Clone(oldIS.temporaryTableIDs)
b.infoSchema.referredForeignKeyMap = maps.Clone(oldIS.referredForeignKeyMap)

copy(b.infoSchema.sortedTablesBuckets, oldIS.sortedTablesBuckets)
return b, nil
}

func (b *Builder) copySchemasMap(oldIS *infoSchema) {
for _, v := range oldIS.schemaMap {
b.infoSchema.addSchema(v)
}
}

// getSchemaAndCopyIfNecessary creates a new schemaTables instance when a table in the database has changed.
// It also does modifications on the new one because old schemaTables must be read-only.
// And it will only copy the changed database once in the lifespan of the Builder.
Expand All @@ -832,10 +828,7 @@ func (b *Builder) getSchemaAndCopyIfNecessary(dbName string) *model.DBInfo {
oldSchemaTables := b.infoSchema.schemaMap[dbName]
newSchemaTables := &schemaTables{
dbInfo: oldSchemaTables.dbInfo.Copy(),
tables: make(map[string]table.Table, len(oldSchemaTables.tables)),
}
for k, v := range oldSchemaTables.tables {
newSchemaTables.tables[k] = v
tables: maps.Clone(oldSchemaTables.tables),
}
b.infoSchema.addSchema(newSchemaTables)
return newSchemaTables.dbInfo
Expand Down
41 changes: 0 additions & 41 deletions pkg/infoschema/builder_misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"fmt"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/ddl/placement"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/parser/model"
)
Expand Down Expand Up @@ -102,46 +101,6 @@ func (b *Builder) addTemporaryTable(tblID int64) {
b.infoSchema.temporaryTableIDs[tblID] = struct{}{}
}

func (b *Builder) copyBundlesMap(oldIS *infoSchema) {
b.infoSchema.ruleBundleMap = make(map[int64]*placement.Bundle)
for id, v := range oldIS.ruleBundleMap {
b.infoSchema.ruleBundleMap[id] = v
}
}

func (b *Builder) copyPoliciesMap(oldIS *infoSchema) {
is := b.infoSchema
for _, v := range oldIS.AllPlacementPolicies() {
is.policyMap[v.Name.L] = v
}
}

func (b *Builder) copyResourceGroupMap(oldIS *infoSchema) {
is := b.infoSchema
for _, v := range oldIS.AllResourceGroups() {
is.resourceGroupMap[v.Name.L] = v
}
}

func (b *Builder) copyTemporaryTableIDsMap(oldIS *infoSchema) {
is := b.infoSchema
if len(oldIS.temporaryTableIDs) == 0 {
is.temporaryTableIDs = nil
return
}

is.temporaryTableIDs = make(map[int64]struct{})
for tblID := range oldIS.temporaryTableIDs {
is.temporaryTableIDs[tblID] = struct{}{}
}
}

func (b *Builder) copyReferredForeignKeyMap(oldIS *infoSchema) {
for k, v := range oldIS.referredForeignKeyMap {
b.infoSchema.referredForeignKeyMap[k] = v
}
}

func (b *Builder) initMisc(dbInfos []*model.DBInfo, policies []*model.PolicyInfo, resourceGroups []*model.ResourceGroupInfo) {
info := b.infoSchema
// build the policies.
Expand Down
4 changes: 4 additions & 0 deletions pkg/infoschema/context/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,12 @@ type Misc interface {
AllPlacementBundles() []*placement.Bundle
// AllPlacementPolicies returns all placement policies
AllPlacementPolicies() []*model.PolicyInfo
// ClonePlacementPolicies returns a copy of all placement policies.
ClonePlacementPolicies() map[string]*model.PolicyInfo
// AllResourceGroups returns all resource groups
AllResourceGroups() []*model.ResourceGroupInfo
// CloneResourceGroups returns a copy of all resource groups.
CloneResourceGroups() map[string]*model.ResourceGroupInfo
// HasTemporaryTable returns whether information schema has temporary table
HasTemporaryTable() bool
// GetTableReferredForeignKeys gets the table's ReferredFKInfo by lowercase schema and table name.
Expand Down
13 changes: 13 additions & 0 deletions pkg/infoschema/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"cmp"
stdctx "context"
"fmt"
"maps"
"slices"
"sort"
"sync"
Expand Down Expand Up @@ -513,6 +514,12 @@ func (is *infoSchemaMisc) AllResourceGroups() []*model.ResourceGroupInfo {
return groups
}

func (is *infoSchemaMisc) CloneResourceGroups() map[string]*model.ResourceGroupInfo {
is.resourceGroupMutex.RLock()
defer is.resourceGroupMutex.RUnlock()
return maps.Clone(is.resourceGroupMap)
}

// AllPlacementPolicies returns all placement policies
func (is *infoSchemaMisc) AllPlacementPolicies() []*model.PolicyInfo {
is.policyMutex.RLock()
Expand All @@ -524,6 +531,12 @@ func (is *infoSchemaMisc) AllPlacementPolicies() []*model.PolicyInfo {
return policies
}

func (is *infoSchemaMisc) ClonePlacementPolicies() map[string]*model.PolicyInfo {
is.policyMutex.RLock()
defer is.policyMutex.RUnlock()
return maps.Clone(is.policyMap)
}

func (is *infoSchemaMisc) PlacementBundleByPhysicalTableID(id int64) (*placement.Bundle, bool) {
t, r := is.ruleBundleMap[id]
return t, r
Expand Down

0 comments on commit 0b9cd2f

Please sign in to comment.