Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stats: reduce stats collecor's lock contention (#9233) #9451

Merged
merged 1 commit into from
Feb 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 31 additions & 39 deletions statistics/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,18 +112,14 @@ func (m errorRateDeltaMap) clear(tableID int64, histID int64, isIndex bool) {
m[tableID] = item
}

func (h *Handle) merge(s *SessionStatsCollector) {
s.Lock()
defer s.Unlock()
func (h *Handle) merge(s *SessionStatsCollector, rateMap errorRateDeltaMap) {
for id, item := range s.mapper {
h.globalMap.update(id, item.Delta, item.Count, &item.ColSize)
}
h.mu.Lock()
h.mu.rateMap.merge(s.rateMap)
h.mu.Unlock()
s.mapper = make(tableDeltaMap)
rateMap.merge(s.rateMap)
s.rateMap = make(errorRateDeltaMap)
h.feedback = mergeQueryFeedback(h.feedback, s.feedback)
s.mapper = make(tableDeltaMap)
s.feedback = s.feedback[:0]
}

Expand All @@ -134,7 +130,6 @@ type SessionStatsCollector struct {
mapper tableDeltaMap
feedback []*QueryFeedback
rateMap errorRateDeltaMap
prev *SessionStatsCollector
next *SessionStatsCollector
// deleted is set to true when a session is closed. Every time we sweep the list, we will remove the useless collector.
deleted bool
Expand Down Expand Up @@ -207,21 +202,6 @@ func (s *SessionStatsCollector) StoreQueryFeedback(feedback interface{}, h *Hand
return nil
}

// tryToRemoveFromList will remove this collector from the list if it's deleted flag is set.
func (s *SessionStatsCollector) tryToRemoveFromList() {
s.Lock()
defer s.Unlock()
if !s.deleted {
return
}
next := s.next
prev := s.prev
prev.next = next
if next != nil {
next.prev = prev
}
}

// NewSessionStatsCollector allocates a stats collector for a session.
func (h *Handle) NewSessionStatsCollector() *SessionStatsCollector {
h.listHead.Lock()
Expand All @@ -230,10 +210,6 @@ func (h *Handle) NewSessionStatsCollector() *SessionStatsCollector {
mapper: make(tableDeltaMap),
rateMap: make(errorRateDeltaMap),
next: h.listHead.next,
prev: h.listHead,
}
if h.listHead.next != nil {
h.listHead.next.prev = newCollector
}
h.listHead.next = newCollector
return newCollector
Expand Down Expand Up @@ -275,15 +251,36 @@ const (
DumpDelta = false
)

// sweepList will loop over the list, merge each session's local stats into handle
// and remove closed session's collector.
func (h *Handle) sweepList() {
prev := h.listHead
prev.Lock()
errorRateMap := make(errorRateDeltaMap)
for curr := prev.next; curr != nil; curr = curr.next {
curr.Lock()
// Merge the session stats into handle and error rate map.
h.merge(curr, errorRateMap)
if curr.deleted {
prev.next = curr.next
// Since the session is already closed, we can safely unlock it here.
curr.Unlock()
} else {
// Unlock the previous lock, so we only holds at most two session's lock at the same time.
prev.Unlock()
prev = curr
}
}
prev.Unlock()
h.mu.Lock()
h.mu.rateMap.merge(errorRateMap)
h.mu.Unlock()
}

// DumpStatsDeltaToKV sweeps the whole list and updates the global map, then we dumps every table that held in map to KV.
// If the `dumpAll` is false, it will only dump that delta info that `Modify Count / Table Count` greater than a ratio.
func (h *Handle) DumpStatsDeltaToKV(dumpMode bool) error {
h.listHead.Lock()
for collector := h.listHead.next; collector != nil; collector = collector.next {
collector.tryToRemoveFromList()
h.merge(collector)
}
h.listHead.Unlock()
h.sweepList()
currentTime := time.Now()
for id, item := range h.globalMap {
if dumpMode == DumpDelta && !needDumpStatsDelta(h, id, item, currentTime) {
Expand Down Expand Up @@ -416,12 +413,7 @@ func (h *Handle) dumpFeedbackToKV(fb *QueryFeedback) error {
// it takes 10 minutes for a feedback to take effect. However, we can use the
// feedback locally on this tidb-server, so it could be used more timely.
func (h *Handle) UpdateStatsByLocalFeedback(is infoschema.InfoSchema) {
h.listHead.Lock()
for collector := h.listHead.next; collector != nil; collector = collector.next {
collector.tryToRemoveFromList()
h.merge(collector)
}
h.listHead.Unlock()
h.sweepList()
for _, fb := range h.feedback {
table, ok := is.TableByID(fb.tableID)
if !ok {
Expand Down
6 changes: 2 additions & 4 deletions statistics/update_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,15 @@ func (s *testUpdateListSuite) TestInsertAndDelete(c *C) {
items[0].Delete() // delete tail
items[2].Delete() // delete middle
items[4].Delete() // delete head
h.DumpStatsDeltaToKV(DumpAll)
h.sweepList()

c.Assert(h.listHead.next, Equals, items[3])
c.Assert(items[3].next, Equals, items[1])
c.Assert(items[1].next, IsNil)
c.Assert(items[1].prev, Equals, items[3])
c.Assert(items[3].prev, Equals, h.listHead)

// delete rest
items[1].Delete()
items[3].Delete()
h.DumpStatsDeltaToKV(DumpAll)
h.sweepList()
c.Assert(h.listHead.next, IsNil)
}