Skip to content

Commit

Permalink
statistics: ease the impact of stats feedback on cluster (#15503) (#1…
Browse files Browse the repository at this point in the history
…8773)

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
ti-srebot authored Jul 27, 2020
1 parent d44b1e9 commit 9dbeb40
Show file tree
Hide file tree
Showing 9 changed files with 276 additions and 132 deletions.
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ var defaultConf = Config{
RunAutoAnalyze: true,
StmtCountLimit: 5000,
FeedbackProbability: 0.05,
QueryFeedbackLimit: 1024,
QueryFeedbackLimit: 512,
PseudoEstimateRatio: 0.8,
ForcePriority: "NO_PRIORITY",
BindInfoLease: "3s",
Expand Down
2 changes: 1 addition & 1 deletion config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ run-auto-analyze = true
feedback-probability = 0.05

# The max number of query feedback that cache in memory.
query-feedback-limit = 1024
query-feedback-limit = 512

# Pseudo stats will be used if the ratio between the modify count and
# row count in statistics of a table is greater than it.
Expand Down
3 changes: 3 additions & 0 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,10 +371,13 @@ func (s *testSuite1) testAnalyzeIncremental(tk *testkit.TestKit, c *C) {
// Test analyze incremental with feedback.
tk.MustExec("insert into t values (3,3)")
oriProbability := statistics.FeedbackProbability.Load()
oriMinLogCount := handle.MinLogScanCount
defer func() {
statistics.FeedbackProbability.Store(oriProbability)
handle.MinLogScanCount = oriMinLogCount
}()
statistics.FeedbackProbability.Store(1)
handle.MinLogScanCount = 0
is := s.dom.InfoSchema()
table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
Expand Down
133 changes: 102 additions & 31 deletions statistics/feedback.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,64 @@ func NewQueryFeedback(physicalID int64, hist *Histogram, expected int64, desc bo
}
}

// QueryFeedbackKey is the key for a group of feedbacks on the same index/column.
type QueryFeedbackKey struct {
PhysicalID int64
HistID int64
Tp int
}

// QueryFeedbackMap is the collection of feedbacks.
type QueryFeedbackMap struct {
Size int
Feedbacks map[QueryFeedbackKey][]*QueryFeedback
}

// NewQueryFeedbackMap builds a feedback collection.
func NewQueryFeedbackMap() *QueryFeedbackMap {
return &QueryFeedbackMap{Feedbacks: make(map[QueryFeedbackKey][]*QueryFeedback)}
}

// Append adds a feedback into map.
func (m *QueryFeedbackMap) Append(q *QueryFeedback) {
k := QueryFeedbackKey{
PhysicalID: q.PhysicalID,
HistID: q.Hist.ID,
Tp: q.Tp,
}
m.append(k, []*QueryFeedback{q})
return
}

// MaxQueryFeedbackCount is the max number of feedbacks that are cached in memory.
var MaxQueryFeedbackCount = atomic.NewInt64(1 << 9)

func (m *QueryFeedbackMap) append(k QueryFeedbackKey, qs []*QueryFeedback) bool {
remained := MaxQueryFeedbackCount.Load() - int64(m.Size)
if remained <= 0 {
return false
}
s, ok := m.Feedbacks[k]
if !ok || s == nil {
s = make([]*QueryFeedback, 0, 8)
}
l := mathutil.MinInt64(int64(len(qs)), remained)
s = append(s, qs[:l]...)
m.Feedbacks[k] = s
m.Size = m.Size + int(l)
return true
}

// Merge combines 2 collections of feedbacks.
func (m *QueryFeedbackMap) Merge(r *QueryFeedbackMap) {
for k, qs := range r.Feedbacks {
if !m.append(k, qs) {
break
}
}
return
}

var (
// MaxNumberOfRanges is the max number of ranges before split to collect feedback.
MaxNumberOfRanges = 20
Expand Down Expand Up @@ -202,7 +260,7 @@ func (q *QueryFeedback) Actual() int64 {
// Update updates the query feedback. `startKey` is the start scan key of the partial result, used to find
// the range for update. `counts` is the scan counts of each range, used to update the feedback count info.
func (q *QueryFeedback) Update(startKey kv.Key, counts []int64) {
// Older version do not have the counts info.
// Older versions do not have the counts info.
if len(counts) == 0 {
q.Invalidate()
return
Expand Down Expand Up @@ -248,6 +306,43 @@ func (q *QueryFeedback) Update(startKey kv.Key, counts []int64) {
return
}

// NonOverlappedFeedbacks extracts a set of feedbacks which are not overlapped with each other.
func NonOverlappedFeedbacks(sc *stmtctx.StatementContext, fbs []Feedback) ([]Feedback, bool) {
// Sort feedbacks by end point and start point incrementally, then pick every feedback that is not overlapped
// with the previous chosen feedbacks.
var existsErr bool
sort.Slice(fbs, func(i, j int) bool {
res, err := fbs[i].Upper.CompareDatum(sc, fbs[j].Upper)
if err != nil {
existsErr = true
}
if existsErr || res != 0 {
return res < 0
}
res, err = fbs[i].Lower.CompareDatum(sc, fbs[j].Lower)
if err != nil {
existsErr = true
}
return res < 0
})
if existsErr {
return fbs, false
}
resFBs := make([]Feedback, 0, len(fbs))
previousEnd := &types.Datum{}
for _, fb := range fbs {
res, err := previousEnd.CompareDatum(sc, fb.Lower)
if err != nil {
return fbs, false
}
if res <= 0 {
resFBs = append(resFBs, fb)
previousEnd = fb.Upper
}
}
return resFBs, true
}

// BucketFeedback stands for all the feedback for a bucket.
type BucketFeedback struct {
feedback []Feedback // All the feedback info in the same bucket.
Expand Down Expand Up @@ -482,39 +577,15 @@ func (b *BucketFeedback) mergeFullyContainedFeedback(sc *stmtctx.StatementContex
if len(feedbacks) == 0 {
return 0, 0, false
}
// Sort feedbacks by end point and start point incrementally, then pick every feedback that is not overlapped
// with the previous chosen feedbacks.
var existsErr bool
sort.Slice(feedbacks, func(i, j int) bool {
res, err := feedbacks[i].Upper.CompareDatum(sc, feedbacks[j].Upper)
if err != nil {
existsErr = true
}
if existsErr || res != 0 {
return res < 0
}
res, err = feedbacks[i].Lower.CompareDatum(sc, feedbacks[j].Lower)
if err != nil {
existsErr = true
}
return res < 0
})
if existsErr {
sortedFBs, ok := NonOverlappedFeedbacks(sc, feedbacks)
if !ok {
return 0, 0, false
}
previousEnd := &types.Datum{}
var sumFraction, sumCount float64
for _, fb := range feedbacks {
res, err := previousEnd.CompareDatum(sc, fb.Lower)
if err != nil {
return 0, 0, false
}
if res <= 0 {
fraction, _ := getOverlapFraction(fb, bkt)
sumFraction += fraction
sumCount += float64(fb.Count)
previousEnd = fb.Upper
}
for _, fb := range sortedFBs {
fraction, _ := getOverlapFraction(fb, bkt)
sumFraction += fraction
sumCount += float64(fb.Count)
}
return sumFraction, sumCount, true
}
Expand Down
19 changes: 8 additions & 11 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ type Handle struct {
schemaVersion int64
}

// It can be read by multiply readers at the same time without acquire lock, but it can be
// written only after acquire the lock.
// It can be read by multiple readers at the same time without acquiring lock, but it can be
// written only after acquiring the lock.
statsCache struct {
sync.Mutex
atomic.Value
Expand All @@ -78,7 +78,7 @@ type Handle struct {
// globalMap contains all the delta map from collectors when we dump them to KV.
globalMap tableDeltaMap
// feedback is used to store query feedback info.
feedback []*statistics.QueryFeedback
feedback *statistics.QueryFeedbackMap

lease atomic2.Duration
}
Expand All @@ -90,7 +90,7 @@ func (h *Handle) Clear() {
for len(h.ddlEventCh) > 0 {
<-h.ddlEventCh
}
h.feedback = h.feedback[:0]
h.feedback = statistics.NewQueryFeedbackMap()
h.mu.ctx.GetSessionVars().InitChunkSize = 1
h.mu.ctx.GetSessionVars().MaxChunkSize = 1
h.mu.ctx.GetSessionVars().ProjectionConcurrency = 0
Expand All @@ -100,16 +100,13 @@ func (h *Handle) Clear() {
h.mu.Unlock()
}

// MaxQueryFeedbackCount is the max number of feedback that cache in memory.
var MaxQueryFeedbackCount = atomic2.NewInt64(1 << 10)

// NewHandle creates a Handle for update stats.
func NewHandle(ctx sessionctx.Context, lease time.Duration) *Handle {
handle := &Handle{
ddlEventCh: make(chan *util.Event, 100),
listHead: &SessionStatsCollector{mapper: make(tableDeltaMap), rateMap: make(errorRateDeltaMap)},
globalMap: make(tableDeltaMap),
feedback: make([]*statistics.QueryFeedback, 0, MaxQueryFeedbackCount.Load()),
feedback: statistics.NewQueryFeedbackMap(),
}
handle.lease.Store(lease)
// It is safe to use it concurrently because the exec won't touch the ctx.
Expand All @@ -132,10 +129,10 @@ func (h *Handle) SetLease(lease time.Duration) {
h.lease.Store(lease)
}

// GetQueryFeedback gets the query feedback. It is only use in test.
func (h *Handle) GetQueryFeedback() []*statistics.QueryFeedback {
// GetQueryFeedback gets the query feedback. It is only used in test.
func (h *Handle) GetQueryFeedback() *statistics.QueryFeedbackMap {
defer func() {
h.feedback = h.feedback[:0]
h.feedback = statistics.NewQueryFeedbackMap()
}()
return h.feedback
}
Expand Down
Loading

0 comments on commit 9dbeb40

Please sign in to comment.