From 0e1d1b65b152c966a8a7b2831d72948d8b1d4e04 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Thu, 29 Mar 2018 20:02:21 +0800 Subject: [PATCH 01/10] [DNM] enable coprocess streaming for integrated testing --- config/config.go | 2 +- config/config.toml.example | 2 +- store/tikv/client.go | 7 ++++++- store/tikv/coprocessor.go | 4 ++++ 4 files changed, 12 insertions(+), 3 deletions(-) diff --git a/config/config.go b/config/config.go index 5ef4d0d823ac6..501a207f9d75a 100644 --- a/config/config.go +++ b/config/config.go @@ -229,7 +229,7 @@ var defaultConf = Config{ Lease: "10s", TokenLimit: 1000, OOMAction: "log", - EnableStreaming: false, + EnableStreaming: true, LowerCaseTableNames: 2, Log: Log{ Level: "info", diff --git a/config/config.toml.example b/config/config.toml.example index cb230446d2921..67751f314694c 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -39,7 +39,7 @@ enable-chunk = true oom-action = "log" # Enable coprocessor streaming. -enable-streaming = false +enable-streaming = true # Set system variable 'lower_case_table_names' lower-case-table-names = 2 diff --git a/store/tikv/client.go b/store/tikv/client.go index f71e8df034aed..9cdf146bd9ecc 100644 --- a/store/tikv/client.go +++ b/store/tikv/client.go @@ -15,6 +15,7 @@ package tikv import ( + "io" "strconv" "sync" "sync/atomic" @@ -30,6 +31,7 @@ import ( "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/terror" + log "github.com/sirupsen/logrus" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -254,7 +256,10 @@ func (c *rpcClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R var first *coprocessor.Response first, err = copStream.Recv() if err != nil { - return nil, errors.Trace(err) + if errors.Cause(err) != io.EOF { + return nil, errors.Trace(err) + } + log.Info("copstream returns nothing for the request.") } copStream.Response = first return resp, nil diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index e7178379a7cb8..cc4e009a86174 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -619,6 +619,10 @@ func (it *copIterator) handleCopStreamResult(bo *Backoffer, stream *tikvrpc.CopS defer stream.Close() var resp, lastResp *coprocessor.Response resp = stream.Response + if resp == nil { + // streaming request returns io.EOF, so the first Response is nil. + return nil, nil + } for { remainedTasks, err := it.handleCopResponse(bo, resp, task, ch) if err != nil || len(remainedTasks) != 0 { From 0b4bad8166c11451345a6788c1174cff6b11c2db Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Fri, 30 Mar 2018 11:00:53 +0800 Subject: [PATCH 02/10] check range based on last response --- store/tikv/coprocessor.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index cc4e009a86174..a77636c2db568 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -612,7 +612,7 @@ func (it *copIterator) handleTaskOnce(bo *Backoffer, task *copTask, ch chan copR } // Handles the response for non-streaming copTask. - return it.handleCopResponse(bo, resp.Cop, task, ch) + return it.handleCopResponse(bo, resp.Cop, task, ch, resp.Cop) } func (it *copIterator) handleCopStreamResult(bo *Backoffer, stream *tikvrpc.CopStreamResponse, task *copTask, ch chan copResponse) ([]*copTask, error) { @@ -624,7 +624,7 @@ func (it *copIterator) handleCopStreamResult(bo *Backoffer, stream *tikvrpc.CopS return nil, nil } for { - remainedTasks, err := it.handleCopResponse(bo, resp, task, ch) + remainedTasks, err := it.handleCopResponse(bo, resp, task, ch, lastResp) if err != nil || len(remainedTasks) != 0 { return remainedTasks, errors.Trace(err) } @@ -656,7 +656,7 @@ func (it *copIterator) handleCopStreamResult(bo *Backoffer, stream *tikvrpc.CopS // handleCopResponse checks coprocessor Response for region split and lock, // returns more tasks when that happens, or handles the response if no error. -func (it *copIterator) handleCopResponse(bo *Backoffer, resp *coprocessor.Response, task *copTask, ch chan copResponse) ([]*copTask, error) { +func (it *copIterator) handleCopResponse(bo *Backoffer, resp *coprocessor.Response, task *copTask, ch chan copResponse, lastResp *coprocessor.Response) ([]*copTask, error) { if regionErr := resp.GetRegionError(); regionErr != nil { if err := bo.Backoff(BoRegionMiss, errors.New(regionErr.String())); err != nil { return nil, errors.Trace(err) @@ -676,7 +676,7 @@ func (it *copIterator) handleCopResponse(bo *Backoffer, resp *coprocessor.Respon return nil, errors.Trace(err) } } - return buildCopTasksFromRemain(bo, it.store.regionCache, resp, task, it.req.Desc, it.req.Streaming) + return buildCopTasksFromRemain(bo, it.store.regionCache, lastResp, task, it.req.Desc, it.req.Streaming) } if otherErr := resp.GetOtherError(); otherErr != "" { err := errors.Errorf("other error: %s", otherErr) From 05cde223bfd4f246d7499c8638dba4db2ffc2597 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Fri, 30 Mar 2018 11:40:38 +0800 Subject: [PATCH 03/10] handle the first response meet lock --- store/tikv/coprocessor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index a77636c2db568..3c6a89ea9f976 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -696,7 +696,7 @@ func (it *copIterator) handleCopResponse(bo *Backoffer, resp *coprocessor.Respon func buildCopTasksFromRemain(bo *Backoffer, cache *RegionCache, resp *coprocessor.Response, task *copTask, desc bool, streaming bool) ([]*copTask, error) { remainedRanges := task.ranges - if streaming { + if streaming && resp != nil { remainedRanges = calculateRemain(task.ranges, resp.Range, desc) } return buildCopTasks(bo, cache, remainedRanges, desc, streaming) From 8a0bbd38a5e196a2e97c8c594c10b38da16e1c44 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Fri, 30 Mar 2018 18:53:45 +0800 Subject: [PATCH 04/10] executor,store: make leak test more stable --- executor/executor_test.go | 7 ++----- store/store_test.go | 18 ++---------------- 2 files changed, 4 insertions(+), 21 deletions(-) diff --git a/executor/executor_test.go b/executor/executor_test.go index d05d964c9719e..6e4c45b7fa40c 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -85,6 +85,7 @@ type testSuite struct { var mockTikv = flag.Bool("mockTikv", true, "use mock tikv store in executor test") func (s *testSuite) SetUpSuite(c *C) { + testleak.BeforeTest() s.autoIDStep = autoid.GetStep() autoid.SetStep(5000) s.Parser = parser.New() @@ -112,10 +113,7 @@ func (s *testSuite) TearDownSuite(c *C) { s.domain.Close() s.store.Close() autoid.SetStep(s.autoIDStep) -} - -func (s *testSuite) SetUpTest(c *C) { - testleak.BeforeTest() + testleak.AfterTest(c)() } func (s *testSuite) TearDownTest(c *C) { @@ -126,7 +124,6 @@ func (s *testSuite) TearDownTest(c *C) { tableName := tb[0] tk.MustExec(fmt.Sprintf("drop table %v", tableName)) } - testleak.AfterTest(c)() } func (s *testSuite) TestAdmin(c *C) { diff --git a/store/store_test.go b/store/store_test.go index 114771f9ead92..b69f92dbb7014 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -51,6 +51,7 @@ type testKVSuite struct { } func (s *testKVSuite) SetUpSuite(c *C) { + testleak.BeforeTest() store, err := mockstore.NewMockTikvStore() c.Assert(err, IsNil) s.s = store @@ -59,6 +60,7 @@ func (s *testKVSuite) SetUpSuite(c *C) { func (s *testKVSuite) TearDownSuite(c *C) { err := s.s.Close() c.Assert(err, IsNil) + testleak.AfterTest(c)() } func insertData(c *C, txn kv.Transaction) { @@ -155,7 +157,6 @@ func mustGet(c *C, txn kv.Transaction) { } func (s *testKVSuite) TestGetSet(c *C) { - defer testleak.AfterTest(c)() txn, err := s.s.Begin() c.Assert(err, IsNil) @@ -176,7 +177,6 @@ func (s *testKVSuite) TestGetSet(c *C) { } func (s *testKVSuite) TestSeek(c *C) { - defer testleak.AfterTest(c)() txn, err := s.s.Begin() c.Assert(err, IsNil) @@ -196,7 +196,6 @@ func (s *testKVSuite) TestSeek(c *C) { } func (s *testKVSuite) TestInc(c *C) { - defer testleak.AfterTest(c)() txn, err := s.s.Begin() c.Assert(err, IsNil) @@ -231,7 +230,6 @@ func (s *testKVSuite) TestInc(c *C) { } func (s *testKVSuite) TestDelete(c *C) { - defer testleak.AfterTest(c)() txn, err := s.s.Begin() c.Assert(err, IsNil) @@ -267,7 +265,6 @@ func (s *testKVSuite) TestDelete(c *C) { } func (s *testKVSuite) TestDelete2(c *C) { - defer testleak.AfterTest(c)() txn, err := s.s.Begin() c.Assert(err, IsNil) val := []byte("test") @@ -299,7 +296,6 @@ func (s *testKVSuite) TestDelete2(c *C) { } func (s *testKVSuite) TestSetNil(c *C) { - defer testleak.AfterTest(c)() txn, err := s.s.Begin() defer txn.Commit(context.Background()) c.Assert(err, IsNil) @@ -308,7 +304,6 @@ func (s *testKVSuite) TestSetNil(c *C) { } func (s *testKVSuite) TestBasicSeek(c *C) { - defer testleak.AfterTest(c)() txn, err := s.s.Begin() c.Assert(err, IsNil) txn.Set([]byte("1"), []byte("1")) @@ -324,7 +319,6 @@ func (s *testKVSuite) TestBasicSeek(c *C) { } func (s *testKVSuite) TestBasicTable(c *C) { - defer testleak.AfterTest(c)() txn, err := s.s.Begin() c.Assert(err, IsNil) for i := 1; i < 5; i++ { @@ -372,7 +366,6 @@ func (s *testKVSuite) TestBasicTable(c *C) { } func (s *testKVSuite) TestRollback(c *C) { - defer testleak.AfterTest(c)() txn, err := s.s.Begin() c.Assert(err, IsNil) @@ -400,7 +393,6 @@ func (s *testKVSuite) TestRollback(c *C) { } func (s *testKVSuite) TestSeekMin(c *C) { - defer testleak.AfterTest(c)() rows := []struct { key string value string @@ -435,7 +427,6 @@ func (s *testKVSuite) TestSeekMin(c *C) { } func (s *testKVSuite) TestConditionIfNotExist(c *C) { - defer testleak.AfterTest(c)() var success int64 cnt := 100 b := []byte("1") @@ -470,7 +461,6 @@ func (s *testKVSuite) TestConditionIfNotExist(c *C) { } func (s *testKVSuite) TestConditionIfEqual(c *C) { - defer testleak.AfterTest(c)() var success int64 cnt := 100 b := []byte("1") @@ -510,7 +500,6 @@ func (s *testKVSuite) TestConditionIfEqual(c *C) { } func (s *testKVSuite) TestConditionUpdate(c *C) { - defer testleak.AfterTest(c)() txn, err := s.s.Begin() c.Assert(err, IsNil) txn.Delete([]byte("b")) @@ -521,7 +510,6 @@ func (s *testKVSuite) TestConditionUpdate(c *C) { func (s *testKVSuite) TestDBClose(c *C) { c.Skip("don't know why it fails.") - defer testleak.AfterTest(c)() store, err := mockstore.NewMockTikvStore() c.Assert(err, IsNil) @@ -564,7 +552,6 @@ func (s *testKVSuite) TestDBClose(c *C) { } func (s *testKVSuite) TestIsolationInc(c *C) { - defer testleak.AfterTest(c)() threadCnt := 4 ids := make(map[int64]struct{}, threadCnt*100) @@ -603,7 +590,6 @@ func (s *testKVSuite) TestIsolationInc(c *C) { } func (s *testKVSuite) TestIsolationMultiInc(c *C) { - defer testleak.AfterTest(c)() threadCnt := 4 incCnt := 100 keyCnt := 4 From 47e0ed57ec51db832715b9ec4f4ab677d0797df5 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 2 Apr 2018 18:27:53 +0800 Subject: [PATCH 05/10] disable --- config/config.go | 2 +- config/config.toml.example | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/config/config.go b/config/config.go index 501a207f9d75a..5ef4d0d823ac6 100644 --- a/config/config.go +++ b/config/config.go @@ -229,7 +229,7 @@ var defaultConf = Config{ Lease: "10s", TokenLimit: 1000, OOMAction: "log", - EnableStreaming: true, + EnableStreaming: false, LowerCaseTableNames: 2, Log: Log{ Level: "info", diff --git a/config/config.toml.example b/config/config.toml.example index 67751f314694c..cb230446d2921 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -39,7 +39,7 @@ enable-chunk = true oom-action = "log" # Enable coprocessor streaming. -enable-streaming = true +enable-streaming = false # Set system variable 'lower_case_table_names' lower-case-table-names = 2 From 6bac9eb7204f418ad313c8a661c5fea2f56bb791 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 3 Apr 2018 14:45:14 +0800 Subject: [PATCH 06/10] address comment --- store/tikv/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/tikv/client.go b/store/tikv/client.go index 9cdf146bd9ecc..30909d9264cbd 100644 --- a/store/tikv/client.go +++ b/store/tikv/client.go @@ -259,7 +259,7 @@ func (c *rpcClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R if errors.Cause(err) != io.EOF { return nil, errors.Trace(err) } - log.Info("copstream returns nothing for the request.") + log.Debug("copstream returns nothing for the request.") } copStream.Response = first return resp, nil From 877a944767a6ca9f24733ad0f723bebb8e5eda47 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 3 Apr 2018 17:37:47 +0800 Subject: [PATCH 07/10] address comment --- store/tikv/coprocessor.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index 3c6a89ea9f976..b631eba29e14f 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -656,6 +656,8 @@ func (it *copIterator) handleCopStreamResult(bo *Backoffer, stream *tikvrpc.CopS // handleCopResponse checks coprocessor Response for region split and lock, // returns more tasks when that happens, or handles the response if no error. +// if we're handling streaming coprocessor response, lastResp is the last successful response, +// if we're handling normal request, lastResp and resp is the same. func (it *copIterator) handleCopResponse(bo *Backoffer, resp *coprocessor.Response, task *copTask, ch chan copResponse, lastResp *coprocessor.Response) ([]*copTask, error) { if regionErr := resp.GetRegionError(); regionErr != nil { if err := bo.Backoff(BoRegionMiss, errors.New(regionErr.String())); err != nil { From 0f4cf658cc29a5a0351e6307b9adcb6c0b9962aa Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 4 Apr 2018 12:50:01 +0800 Subject: [PATCH 08/10] address comment --- store/tikv/coprocessor.go | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index b631eba29e14f..925d994835e8a 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -612,19 +612,20 @@ func (it *copIterator) handleTaskOnce(bo *Backoffer, task *copTask, ch chan copR } // Handles the response for non-streaming copTask. - return it.handleCopResponse(bo, resp.Cop, task, ch, resp.Cop) + return it.handleCopResponse(bo, resp.Cop, task, ch, nil) } func (it *copIterator) handleCopStreamResult(bo *Backoffer, stream *tikvrpc.CopStreamResponse, task *copTask, ch chan copResponse) ([]*copTask, error) { defer stream.Close() - var resp, lastResp *coprocessor.Response + var resp *coprocessor.Response + var lastRange *coprocessor.KeyRange resp = stream.Response if resp == nil { // streaming request returns io.EOF, so the first Response is nil. return nil, nil } for { - remainedTasks, err := it.handleCopResponse(bo, resp, task, ch, lastResp) + remainedTasks, err := it.handleCopResponse(bo, resp, task, ch, lastRange) if err != nil || len(remainedTasks) != 0 { return remainedTasks, errors.Trace(err) } @@ -640,25 +641,25 @@ func (it *copIterator) handleCopStreamResult(bo *Backoffer, stream *tikvrpc.CopS // No coprocessor.Response for network error, rebuild task based on the last success one. ranges := task.ranges - if lastResp != nil { + if lastRange != nil { if it.req.Desc { - ranges, _ = ranges.split(lastResp.GetRange().Start) + ranges, _ = ranges.split(lastRange.Start) } else { - _, ranges = ranges.split(lastResp.GetRange().End) + _, ranges = ranges.split(lastRange.End) } } log.Info("stream recv timeout:", err) return buildCopTasks(bo, it.store.regionCache, ranges, it.req.Desc, true) } - lastResp = resp + lastRange = resp.Range } } // handleCopResponse checks coprocessor Response for region split and lock, // returns more tasks when that happens, or handles the response if no error. -// if we're handling streaming coprocessor response, lastResp is the last successful response, -// if we're handling normal request, lastResp and resp is the same. -func (it *copIterator) handleCopResponse(bo *Backoffer, resp *coprocessor.Response, task *copTask, ch chan copResponse, lastResp *coprocessor.Response) ([]*copTask, error) { +// if we're handling streaming coprocessor response, lastRange is the range of last +// successful response, otherwise it's nil. +func (it *copIterator) handleCopResponse(bo *Backoffer, resp *coprocessor.Response, task *copTask, ch chan copResponse, lastRange *coprocessor.KeyRange) ([]*copTask, error) { if regionErr := resp.GetRegionError(); regionErr != nil { if err := bo.Backoff(BoRegionMiss, errors.New(regionErr.String())); err != nil { return nil, errors.Trace(err) @@ -678,7 +679,7 @@ func (it *copIterator) handleCopResponse(bo *Backoffer, resp *coprocessor.Respon return nil, errors.Trace(err) } } - return buildCopTasksFromRemain(bo, it.store.regionCache, lastResp, task, it.req.Desc, it.req.Streaming) + return buildCopTasksFromRemain(bo, it.store.regionCache, lastRange, task, it.req.Desc, it.req.Streaming) } if otherErr := resp.GetOtherError(); otherErr != "" { err := errors.Errorf("other error: %s", otherErr) @@ -696,10 +697,10 @@ func (it *copIterator) handleCopResponse(bo *Backoffer, resp *coprocessor.Respon return nil, nil } -func buildCopTasksFromRemain(bo *Backoffer, cache *RegionCache, resp *coprocessor.Response, task *copTask, desc bool, streaming bool) ([]*copTask, error) { +func buildCopTasksFromRemain(bo *Backoffer, cache *RegionCache, lastRange *coprocessor.KeyRange, task *copTask, desc bool, streaming bool) ([]*copTask, error) { remainedRanges := task.ranges - if streaming && resp != nil { - remainedRanges = calculateRemain(task.ranges, resp.Range, desc) + if streaming && lastRange != nil { + remainedRanges = calculateRemain(task.ranges, lastRange, desc) } return buildCopTasks(bo, cache, remainedRanges, desc, streaming) } From 79f1c3798c4a31e30103f63a000a1af1f424f9ff Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 4 Apr 2018 13:11:14 +0800 Subject: [PATCH 09/10] simplify code and test --- config/config.go | 2 +- config/config.toml.example | 2 +- store/tikv/coprocessor.go | 10 +--------- 3 files changed, 3 insertions(+), 11 deletions(-) diff --git a/config/config.go b/config/config.go index 5ef4d0d823ac6..501a207f9d75a 100644 --- a/config/config.go +++ b/config/config.go @@ -229,7 +229,7 @@ var defaultConf = Config{ Lease: "10s", TokenLimit: 1000, OOMAction: "log", - EnableStreaming: false, + EnableStreaming: true, LowerCaseTableNames: 2, Log: Log{ Level: "info", diff --git a/config/config.toml.example b/config/config.toml.example index cb230446d2921..67751f314694c 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -39,7 +39,7 @@ enable-chunk = true oom-action = "log" # Enable coprocessor streaming. -enable-streaming = false +enable-streaming = true # Set system variable 'lower_case_table_names' lower-case-table-names = 2 diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index 925d994835e8a..a57ad7b083069 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -640,16 +640,8 @@ func (it *copIterator) handleCopStreamResult(bo *Backoffer, stream *tikvrpc.CopS } // No coprocessor.Response for network error, rebuild task based on the last success one. - ranges := task.ranges - if lastRange != nil { - if it.req.Desc { - ranges, _ = ranges.split(lastRange.Start) - } else { - _, ranges = ranges.split(lastRange.End) - } - } log.Info("stream recv timeout:", err) - return buildCopTasks(bo, it.store.regionCache, ranges, it.req.Desc, true) + return buildCopTasksFromRemain(bo, it.store.regionCache, lastRange, task, it.req.Desc, true) } lastRange = resp.Range } From d5eb49e49fc0cdd19411398bc5fda32f532bf04d Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 4 Apr 2018 13:37:17 +0800 Subject: [PATCH 10/10] all-test-passed --- config/config.go | 2 +- config/config.toml.example | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/config/config.go b/config/config.go index 501a207f9d75a..5ef4d0d823ac6 100644 --- a/config/config.go +++ b/config/config.go @@ -229,7 +229,7 @@ var defaultConf = Config{ Lease: "10s", TokenLimit: 1000, OOMAction: "log", - EnableStreaming: true, + EnableStreaming: false, LowerCaseTableNames: 2, Log: Log{ Level: "info", diff --git a/config/config.toml.example b/config/config.toml.example index 2cf39f8248b25..6b082406e0c87 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -36,7 +36,7 @@ token-limit = 1000 oom-action = "log" # Enable coprocessor streaming. -enable-streaming = true +enable-streaming = false # Set system variable 'lower_case_table_names' lower-case-table-names = 2