Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: coprocess streaming tiny fix #6186

Merged
merged 14 commits into from
Apr 4, 2018
7 changes: 6 additions & 1 deletion store/tikv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package tikv

import (
"io"
"strconv"
"sync"
"sync/atomic"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/terror"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -254,7 +256,10 @@ func (c *rpcClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
var first *coprocessor.Response
first, err = copStream.Recv()
if err != nil {
return nil, errors.Trace(err)
if errors.Cause(err) != io.EOF {
return nil, errors.Trace(err)
}
log.Debug("copstream returns nothing for the request.")
}
copStream.Response = first
return resp, nil
Expand Down
35 changes: 17 additions & 18 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,15 +612,20 @@ func (it *copIterator) handleTaskOnce(bo *Backoffer, task *copTask, ch chan copR
}

// Handles the response for non-streaming copTask.
return it.handleCopResponse(bo, resp.Cop, task, ch)
return it.handleCopResponse(bo, resp.Cop, task, ch, nil)
}

func (it *copIterator) handleCopStreamResult(bo *Backoffer, stream *tikvrpc.CopStreamResponse, task *copTask, ch chan copResponse) ([]*copTask, error) {
defer stream.Close()
var resp, lastResp *coprocessor.Response
var resp *coprocessor.Response
var lastRange *coprocessor.KeyRange
resp = stream.Response
if resp == nil {
// streaming request returns io.EOF, so the first Response is nil.
return nil, nil
}
for {
remainedTasks, err := it.handleCopResponse(bo, resp, task, ch)
remainedTasks, err := it.handleCopResponse(bo, resp, task, ch, lastRange)
if err != nil || len(remainedTasks) != 0 {
return remainedTasks, errors.Trace(err)
}
Expand All @@ -635,24 +640,18 @@ func (it *copIterator) handleCopStreamResult(bo *Backoffer, stream *tikvrpc.CopS
}

// No coprocessor.Response for network error, rebuild task based on the last success one.
ranges := task.ranges
if lastResp != nil {
if it.req.Desc {
ranges, _ = ranges.split(lastResp.GetRange().Start)
} else {
_, ranges = ranges.split(lastResp.GetRange().End)
}
}
log.Info("stream recv timeout:", err)
return buildCopTasks(bo, it.store.regionCache, ranges, it.req.Desc, true)
return buildCopTasksFromRemain(bo, it.store.regionCache, lastRange, task, it.req.Desc, true)
}
lastResp = resp
lastRange = resp.Range
}
}

// handleCopResponse checks coprocessor Response for region split and lock,
// returns more tasks when that happens, or handles the response if no error.
func (it *copIterator) handleCopResponse(bo *Backoffer, resp *coprocessor.Response, task *copTask, ch chan copResponse) ([]*copTask, error) {
// if we're handling streaming coprocessor response, lastRange is the range of last
// successful response, otherwise it's nil.
func (it *copIterator) handleCopResponse(bo *Backoffer, resp *coprocessor.Response, task *copTask, ch chan copResponse, lastRange *coprocessor.KeyRange) ([]*copTask, error) {
if regionErr := resp.GetRegionError(); regionErr != nil {
if err := bo.Backoff(BoRegionMiss, errors.New(regionErr.String())); err != nil {
return nil, errors.Trace(err)
Expand All @@ -672,7 +671,7 @@ func (it *copIterator) handleCopResponse(bo *Backoffer, resp *coprocessor.Respon
return nil, errors.Trace(err)
}
}
return buildCopTasksFromRemain(bo, it.store.regionCache, resp, task, it.req.Desc, it.req.Streaming)
return buildCopTasksFromRemain(bo, it.store.regionCache, lastRange, task, it.req.Desc, it.req.Streaming)
}
if otherErr := resp.GetOtherError(); otherErr != "" {
err := errors.Errorf("other error: %s", otherErr)
Expand All @@ -690,10 +689,10 @@ func (it *copIterator) handleCopResponse(bo *Backoffer, resp *coprocessor.Respon
return nil, nil
}

func buildCopTasksFromRemain(bo *Backoffer, cache *RegionCache, resp *coprocessor.Response, task *copTask, desc bool, streaming bool) ([]*copTask, error) {
func buildCopTasksFromRemain(bo *Backoffer, cache *RegionCache, lastRange *coprocessor.KeyRange, task *copTask, desc bool, streaming bool) ([]*copTask, error) {
remainedRanges := task.ranges
if streaming {
remainedRanges = calculateRemain(task.ranges, resp.Range, desc)
if streaming && lastRange != nil {
remainedRanges = calculateRemain(task.ranges, lastRange, desc)
}
return buildCopTasks(bo, cache, remainedRanges, desc, streaming)
}
Expand Down