Skip to content

Commit

Permalink
executor,distsql: fix analyze version 2 memory leak (#28729)
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored Oct 12, 2021
1 parent e933e00 commit d53f9f5
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 10 deletions.
7 changes: 7 additions & 0 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -273,6 +274,12 @@ func (r *selectResult) Next(ctx context.Context, chk *chunk.Chunk) error {

// NextRaw returns the next raw partial result.
func (r *selectResult) NextRaw(ctx context.Context) (data []byte, err error) {
failpoint.Inject("mockNextRawError", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(nil, errors.New("mockNextRawError"))
}
})

resultSubset, err := r.resp.Next(ctx)
r.partialCount++
r.feedback.Invalidate()
Expand Down
28 changes: 18 additions & 10 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,21 @@ func (e AnalyzeColumnsExec) decodeSampleDataWithVirtualColumn(
return nil
}

func readDataAndSendTask(handler *tableResultHandler, mergeTaskCh chan []byte) error {
defer close(mergeTaskCh)
for {
data, err := handler.nextRaw(context.TODO())
if err != nil {
return errors.Trace(err)
}
if data == nil {
break
}
mergeTaskCh <- data
}
return nil
}

func (e *AnalyzeColumnsExec) buildSamplingStats(
ranges []*ranger.Range,
needExtStats bool,
Expand Down Expand Up @@ -843,17 +858,10 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(
for i := 0; i < statsConcurrency; i++ {
go e.subMergeWorker(mergeResultCh, mergeTaskCh, l, i == 0)
}
for {
data, err1 := e.resultHandler.nextRaw(context.TODO())
if err1 != nil {
return 0, nil, nil, nil, nil, err1
}
if data == nil {
break
}
mergeTaskCh <- data
if err = readDataAndSendTask(e.resultHandler, mergeTaskCh); err != nil {
return 0, nil, nil, nil, nil, err
}
close(mergeTaskCh)

mergeWorkerPanicCnt := 0
for mergeWorkerPanicCnt < statsConcurrency {
mergeResult, ok := <-mergeResultCh
Expand Down
15 changes: 15 additions & 0 deletions executor/seqtest/seq_executor_serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1573,3 +1573,18 @@ func TestIssue19410(t *testing.T) {
tk.MustQuery("select /*+ INL_HASH_JOIN(t3) */ * from t join t3 on t.b = t3.b1;").Check(testkit.Rows("1 A 1 A"))
tk.MustQuery("select /*+ INL_JOIN(t3) */ * from t join t3 on t.b = t3.b1;").Check(testkit.Rows("1 A 1 A"))
}

func TestAnalyzeNextRawErrorNoLeak(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(id int, c varchar(32))")
tk.MustExec("set @@session.tidb_analyze_version = 2")

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/distsql/mockNextRawError", `return(true)`))
err := tk.ExecToErr("analyze table t1")
require.EqualError(t, err, "mockNextRawError")
}

0 comments on commit d53f9f5

Please sign in to comment.