Skip to content

Commit

Permalink
tikv: make requests fail-fast for dial timeouted (#12819) (#12926)
Browse files Browse the repository at this point in the history
  • Loading branch information
lysu authored and ngaut committed Oct 28, 2019
1 parent e69972d commit 36d9730
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 11 deletions.
60 changes: 49 additions & 11 deletions store/tikv/client_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
)

type batchConn struct {
Expand Down Expand Up @@ -217,6 +218,17 @@ func (c *batchCommandsClient) send(request *tikvpb.BatchCommandsRequest, entries
for i, requestID := range request.RequestIds {
c.batched.Store(requestID, entries[i])
}

if err := c.initBatchClient(); err != nil {
logutil.Logger(context.Background()).Warn(
"init create streaming fail",
zap.String("target", c.target),
zap.Error(err),
)
c.failPendingRequests(err)
return
}

if err := c.client.Send(request); err != nil {
logutil.Logger(context.Background()).Info(
"sending batch commands meets error",
Expand Down Expand Up @@ -248,19 +260,40 @@ func (c *batchCommandsClient) failPendingRequests(err error) {
})
}

func (c *batchCommandsClient) reCreateStreamingClientOnce(err error) error {
c.failPendingRequests(err) // fail all pending requests.
func (c *batchCommandsClient) waitConnReady() (err error) {
dialCtx, cancel := context.WithTimeout(context.Background(), dialTimeout)
for {
s := c.conn.GetState()
if s == connectivity.Ready {
cancel()
break
}
if !c.conn.WaitForStateChange(dialCtx, s) {
cancel()
err = dialCtx.Err()
return
}
}
return
}

func (c *batchCommandsClient) reCreateStreamingClientOnce(perr error) error {
c.failPendingRequests(perr) // fail all pending requests.

err := c.waitConnReady()
// Re-establish a application layer stream. TCP layer is handled by gRPC.
tikvClient := tikvpb.NewTikvClient(c.conn)
streamClient, err := tikvClient.BatchCommands(context.TODO())
if err == nil {
logutil.Logger(context.Background()).Info(
"batchRecvLoop re-create streaming success",
zap.String("target", c.target),
)
c.client = streamClient

return nil
tikvClient := tikvpb.NewTikvClient(c.conn)
var streamClient tikvpb.Tikv_BatchCommandsClient
streamClient, err = tikvClient.BatchCommands(context.TODO())
if err == nil {
logutil.Logger(context.Background()).Info(
"batchRecvLoop re-create streaming success",
zap.String("target", c.target),
)
c.client = streamClient
return nil
}
}
logutil.Logger(context.Background()).Info(
"batchRecvLoop re-create streaming fail",
Expand Down Expand Up @@ -470,6 +503,11 @@ func (c *batchCommandsClient) initBatchClient() error {
if c.client != nil {
return nil
}

if err := c.waitConnReady(); err != nil {
return err
}

// Initialize batch streaming clients.
tikvClient := tikvpb.NewTikvClient(c.conn)
streamClient, err := tikvClient.BatchCommands(context.TODO())
Expand Down
7 changes: 7 additions & 0 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/google/btree"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/pd/client"
"github.com/pingcap/tidb/metrics"
Expand Down Expand Up @@ -275,6 +276,12 @@ func (c *RegionCache) GetRPCContext(bo *Backoffer, id RegionVerID) (*RPCContext,
if err != nil {
return nil, err
}
// enable by `curl -XPUT -d '1*return("[some-addr]")->return("")' http://host:port/github.com/pingcap/tidb/store/tikv/injectWrongStoreAddr`
failpoint.Inject("injectWrongStoreAddr", func(val failpoint.Value) {
if a, ok := val.(string); ok && len(a) > 0 {
addr = a
}
})
if store == nil || len(addr) == 0 {
// Store not found, region must be out of date.
cachedRegion.invalidate()
Expand Down

0 comments on commit 36d9730

Please sign in to comment.