Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: add the refresher as a stats owner listener #56998

Merged
merged 8 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2360,7 +2360,7 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err
variable.EnableStatsOwner = do.enableStatsOwner
variable.DisableStatsOwner = do.disableStatsOwner
do.statsOwner = do.newOwnerManager(handle.StatsPrompt, handle.StatsOwnerKey)
do.statsOwner.SetListener(do.ddlNotifier)
do.statsOwner.SetListener(owner.NewListenersWrapper(statsHandle, do.ddlNotifier))
do.wg.Run(func() {
do.indexUsageWorker()
}, "indexUsageWorker")
Expand Down
2 changes: 1 addition & 1 deletion pkg/owner/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ go_test(
],
embed = [":owner"],
flaky = True,
shard_count = 9,
shard_count = 10,
deps = [
"//pkg/ddl",
"//pkg/infoschema",
Expand Down
25 changes: 25 additions & 0 deletions pkg/owner/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,3 +554,28 @@ func AcquireDistributedLock(
}
}, nil
}

// ListenersWrapper is a list of listeners.
// A way to broadcast events to multiple listeners.
type ListenersWrapper struct {
listeners []Listener
}

// OnBecomeOwner broadcasts the OnBecomeOwner event to all listeners.
func (ol *ListenersWrapper) OnBecomeOwner() {
for _, l := range ol.listeners {
l.OnBecomeOwner()
}
}

// OnRetireOwner broadcasts the OnRetireOwner event to all listeners.
func (ol *ListenersWrapper) OnRetireOwner() {
for _, l := range ol.listeners {
l.OnRetireOwner()
}
}

// NewListenersWrapper creates a new OwnerListeners.
func NewListenersWrapper(listeners ...Listener) *ListenersWrapper {
return &ListenersWrapper{listeners: listeners}
}
16 changes: 16 additions & 0 deletions pkg/owner/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,3 +560,19 @@ func TestAcquireDistributedLock(t *testing.T) {
release2()
})
}

func TestListenersWrapper(t *testing.T) {
lis1 := &listener{}
lis2 := &listener{}
wrapper := owner.NewListenersWrapper(lis1, lis2)

// Test OnBecomeOwner
wrapper.OnBecomeOwner()
require.True(t, lis1.val.Load())
require.True(t, lis2.val.Load())

// Test OnRetireOwner
wrapper.OnRetireOwner()
require.False(t, lis1.val.Load())
require.False(t, lis2.val.Load())
}
10 changes: 10 additions & 0 deletions pkg/statistics/handle/autoanalyze/autoanalyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@ func (sa *statsAnalyze) CleanupCorruptedAnalyzeJobsOnDeadInstances() error {
}, statsutil.FlagWrapTxn)
}

// OnBecomeOwner is used to handle the event when the current TiDB instance becomes the stats owner.
func (sa *statsAnalyze) OnBecomeOwner() {
sa.refresher.OnBecomeOwner()
}

// OnRetireOwner is used to handle the event when the current TiDB instance retires from being the stats owner.
func (sa *statsAnalyze) OnRetireOwner() {
sa.refresher.OnRetireOwner()
}

// SelectAnalyzeJobsOnCurrentInstanceSQL is the SQL to select the analyze jobs whose
// state is `pending` or `running` and the update time is more than 10 minutes ago
// and the instance is current instance.
Expand Down
32 changes: 17 additions & 15 deletions pkg/statistics/handle/autoanalyze/priorityqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,20 @@ type pqHeap interface {
//
//nolint:fieldalignment
type AnalysisPriorityQueue struct {
ctx context.Context
statsHandle statstypes.StatsHandle
calculator *PriorityCalculator

ctx context.Context
cancel context.CancelFunc
wg util.WaitGroupWrapper
wg util.WaitGroupWrapper

// syncFields is a substructure to hold fields protected by mu.
syncFields struct {
// mu is used to protect the following fields.
mu sync.RWMutex
inner pqHeap
mu sync.RWMutex
// Because the Initialize and Close functions can be called concurrently,
// so we need to protect the cancel function to avoid data race.
cancel context.CancelFunc
inner pqHeap
// runningJobs is a map to store the running jobs. Used to avoid duplicate jobs.
runningJobs map[int64]struct{}
// lastDMLUpdateFetchTimestamp is the timestamp of the last DML update fetch.
Expand All @@ -97,20 +99,11 @@ type AnalysisPriorityQueue struct {

// NewAnalysisPriorityQueue creates a new AnalysisPriorityQueue2.
func NewAnalysisPriorityQueue(handle statstypes.StatsHandle) *AnalysisPriorityQueue {
ctx, cancel := context.WithCancel(context.Background())

queue := &AnalysisPriorityQueue{
statsHandle: handle,
calculator: NewPriorityCalculator(),
ctx: ctx,
cancel: cancel,
}

queue.syncFields.mu.Lock()
queue.syncFields.runningJobs = make(map[int64]struct{})
queue.syncFields.failedJobs = make(map[int64]struct{})
queue.syncFields.mu.Unlock()

return queue
}

Expand Down Expand Up @@ -144,6 +137,12 @@ func (pq *AnalysisPriorityQueue) Initialize() error {
pq.Close()
return errors.Trace(err)
}

ctx, cancel := context.WithCancel(context.Background())
pq.ctx = ctx
pq.syncFields.cancel = cancel
pq.syncFields.runningJobs = make(map[int64]struct{})
pq.syncFields.failedJobs = make(map[int64]struct{})
pq.syncFields.initialized = true
pq.syncFields.mu.Unlock()

Expand Down Expand Up @@ -813,6 +812,9 @@ func (pq *AnalysisPriorityQueue) Close() {
return
}

pq.cancel()
// It is possible that the priority queue is not initialized.
if pq.syncFields.cancel != nil {
pq.syncFields.cancel()
}
pq.wg.Wait()
}
12 changes: 12 additions & 0 deletions pkg/statistics/handle/autoanalyze/refresher/refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,3 +251,15 @@ func (r *Refresher) Close() {
r.jobs.Close()
}
}

// OnBecomeOwner is used to handle the event when the current TiDB instance becomes the stats owner.
func (*Refresher) OnBecomeOwner() {
// No action is taken when becoming the stats owner.
// Initialization of the Refresher can fail, so operations are deferred until the first auto-analyze check.
}

// OnRetireOwner is used to handle the event when the current TiDB instance retires from being the stats owner.
func (r *Refresher) OnRetireOwner() {
// Stop the worker and close the queue.
r.jobs.Close()
}
6 changes: 6 additions & 0 deletions pkg/statistics/handle/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,12 @@ type StatsAnalyze interface {
// CheckAnalyzeVersion checks whether all the statistics versions of this table's columns and indexes are the same.
CheckAnalyzeVersion(tblInfo *model.TableInfo, physicalIDs []int64, version *int) bool

// OnBecomeOwner is used to handle the event when the current TiDB instance becomes the stats owner.
OnBecomeOwner()
Rustin170506 marked this conversation as resolved.
Show resolved Hide resolved

// OnRetireOwner is used to handle the event when the current TiDB instance retires from being the stats owner.
OnRetireOwner()

// Close closes the analyze worker.
Close()
}
Expand Down