Skip to content

Commit

Permalink
client: supports scatter region and get operator status
Browse files Browse the repository at this point in the history
  • Loading branch information
nolouch committed Apr 11, 2019
1 parent 9ba0e44 commit 8f8de63
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 0 deletions.
31 changes: 31 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ type Client interface {
// If the given safePoint is less than the current one, it will not be updated.
// Returns the new safePoint after updating.
UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint64, error)
// ScatterRegion scatters the specified region. Should use it for a batch of regions,
// and the distribution of these regions will be dispersed.
ScatterRegion(ctx context.Context, regionID uint64) error
// GetOperator gets the status of operator of the specified region.
GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error)
// Close closes the client.
Close()
}
Expand Down Expand Up @@ -516,6 +521,7 @@ func (c *client) Close() {
}
}

// leaderClient gets the client of current PD leader.
func (c *client) leaderClient() pdpb.PDClient {
c.connMu.RLock()
defer c.connMu.RUnlock()
Expand Down Expand Up @@ -752,6 +758,31 @@ func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint6
return resp.GetNewSafePoint(), nil
}

func (c *client) ScatterRegion(ctx context.Context, regionID uint64) error {
ctx, cancel := context.WithTimeout(ctx, pdTimeout)
resp, err := c.leaderClient().ScatterRegion(ctx, &pdpb.ScatterRegionRequest{
Header: c.requestHeader(),
RegionId: regionID,
})
cancel()
if err != nil {
return err
}
if resp.Header.GetError() != nil {
return errors.Errorf("scatter region %d failed: %s", regionID, resp.Header.GetError().String())
}
return nil
}

func (c *client) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) {
ctx, cancel := context.WithTimeout(ctx, pdTimeout)
defer cancel()
return c.leaderClient().GetOperator(ctx, &pdpb.GetOperatorRequest{
Header: c.requestHeader(),
RegionId: regionID,
})
}

func (c *client) requestHeader() *pdpb.RequestHeader {
return &pdpb.RequestHeader{
ClusterId: c.clusterID,
Expand Down
34 changes: 34 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ func (s *testClientSuite) SetUpSuite(c *C) {
c.Assert(err, IsNil)
s.regionHeartbeat, err = s.grpcPDClient.RegionHeartbeat(context.Background())
c.Assert(err, IsNil)
err = s.srv.SetReplicationConfig(server.ReplicationConfig{MaxReplicas: 1})
c.Assert(err, IsNil)
}

func (s *testClientSuite) TearDownSuite(c *C) {
Expand Down Expand Up @@ -329,3 +331,35 @@ func (s *testClientSuite) TestUpdateGCSafePoint(c *C) {
c.Assert(err, IsNil)
s.checkGCSafePoint(c, math.MaxUint64)
}

func (s *testClientSuite) TestScatterRegion(c *C) {
regionID, _ := regionIDAllocator.Alloc()
region := &metapb.Region{
Id: regionID,
RegionEpoch: &metapb.RegionEpoch{
ConfVer: 1,
Version: 1,
},
Peers: []*metapb.Peer{peer},
}
req := &pdpb.RegionHeartbeatRequest{
Header: newHeader(s.srv),
Region: region,
Leader: peer,
}
err := s.regionHeartbeat.Send(req)
c.Assert(err, IsNil)
testutil.WaitUntil(c, func(c *C) bool {
err := s.client.ScatterRegion(context.Background(), regionID)
if c.Check(err, NotNil) {
return false
}
// no operator will create cause of the store number is 1.
resp, err := s.client.GetOperator(context.Background(), regionID)
if c.Check(err, NotNil) {
return false
}
return c.Check(resp.GetHeader().GetError().GetType(), DeepEquals, pdpb.ErrorType_REGION_NOT_FOUND)
})
c.Succeed()
}

0 comments on commit 8f8de63

Please sign in to comment.