diff --git a/client/client.go b/client/client.go index ebb6536f861..f70037e89d0 100644 --- a/client/client.go +++ b/client/client.go @@ -64,6 +64,11 @@ type Client interface { // If the given safePoint is less than the current one, it will not be updated. // Returns the new safePoint after updating. UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint64, error) + // ScatterRegion scatters the specified region. Should use it for a batch of regions, + // and the distribution of these regions will be dispersed. + ScatterRegion(ctx context.Context, regionID uint64) error + // GetOperator gets the status of operator of the specified region. + GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) // Close closes the client. Close() } @@ -516,6 +521,7 @@ func (c *client) Close() { } } +// leaderClient gets the client of current PD leader. func (c *client) leaderClient() pdpb.PDClient { c.connMu.RLock() defer c.connMu.RUnlock() @@ -752,6 +758,31 @@ func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint6 return resp.GetNewSafePoint(), nil } +func (c *client) ScatterRegion(ctx context.Context, regionID uint64) error { + ctx, cancel := context.WithTimeout(ctx, pdTimeout) + resp, err := c.leaderClient().ScatterRegion(ctx, &pdpb.ScatterRegionRequest{ + Header: c.requestHeader(), + RegionId: regionID, + }) + cancel() + if err != nil { + return err + } + if resp.Header.GetError() != nil { + return errors.Errorf("scatter region %d failed: %s", regionID, resp.Header.GetError().String()) + } + return nil +} + +func (c *client) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) { + ctx, cancel := context.WithTimeout(ctx, pdTimeout) + defer cancel() + return c.leaderClient().GetOperator(ctx, &pdpb.GetOperatorRequest{ + Header: c.requestHeader(), + RegionId: regionID, + }) +} + func (c *client) requestHeader() *pdpb.RequestHeader { return &pdpb.RequestHeader{ ClusterId: c.clusterID, diff --git a/client/client_test.go b/client/client_test.go index 027c65f3fe6..f3b169826ff 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -75,6 +75,8 @@ func (s *testClientSuite) SetUpSuite(c *C) { c.Assert(err, IsNil) s.regionHeartbeat, err = s.grpcPDClient.RegionHeartbeat(context.Background()) c.Assert(err, IsNil) + err = s.srv.SetReplicationConfig(server.ReplicationConfig{MaxReplicas: 1}) + c.Assert(err, IsNil) } func (s *testClientSuite) TearDownSuite(c *C) { @@ -329,3 +331,35 @@ func (s *testClientSuite) TestUpdateGCSafePoint(c *C) { c.Assert(err, IsNil) s.checkGCSafePoint(c, math.MaxUint64) } + +func (s *testClientSuite) TestScatterRegion(c *C) { + regionID, _ := regionIDAllocator.Alloc() + region := &metapb.Region{ + Id: regionID, + RegionEpoch: &metapb.RegionEpoch{ + ConfVer: 1, + Version: 1, + }, + Peers: []*metapb.Peer{peer}, + } + req := &pdpb.RegionHeartbeatRequest{ + Header: newHeader(s.srv), + Region: region, + Leader: peer, + } + err := s.regionHeartbeat.Send(req) + c.Assert(err, IsNil) + testutil.WaitUntil(c, func(c *C) bool { + err := s.client.ScatterRegion(context.Background(), regionID) + if c.Check(err, NotNil) { + return false + } + // no operator will create cause of the store number is 1. + resp, err := s.client.GetOperator(context.Background(), regionID) + if c.Check(err, NotNil) { + return false + } + return c.Check(resp.GetHeader().GetError().GetType(), DeepEquals, pdpb.ErrorType_REGION_NOT_FOUND) + }) + c.Succeed() +}