Skip to content

Commit

Permalink
client: supports scatter region and get operator status (#1501)
Browse files Browse the repository at this point in the history
* client: supports scatter region and get operator status

* imporve test

Signed-off-by: nolouch <nolouch@gmail.com>

* clean up

Signed-off-by: nolouch <nolouch@gmail.com>

* add metrics

Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch authored and disksing committed Apr 15, 2019
1 parent 656a830 commit fc997de
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 26 deletions.
45 changes: 45 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ type Client interface {
// If the given safePoint is less than the current one, it will not be updated.
// Returns the new safePoint after updating.
UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint64, error)
// ScatterRegion scatters the specified region. Should use it for a batch of regions,
// and the distribution of these regions will be dispersed.
ScatterRegion(ctx context.Context, regionID uint64) error
// GetOperator gets the status of operator of the specified region.
GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error)
// Close closes the client.
Close()
}
Expand Down Expand Up @@ -516,6 +521,7 @@ func (c *client) Close() {
}
}

// leaderClient gets the client of current PD leader.
func (c *client) leaderClient() pdpb.PDClient {
c.connMu.RLock()
defer c.connMu.RUnlock()
Expand Down Expand Up @@ -752,6 +758,45 @@ func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint6
return resp.GetNewSafePoint(), nil
}

func (c *client) ScatterRegion(ctx context.Context, regionID uint64) error {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.ScatterRegion", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDuration.WithLabelValues("scatter_region").Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, pdTimeout)
resp, err := c.leaderClient().ScatterRegion(ctx, &pdpb.ScatterRegionRequest{
Header: c.requestHeader(),
RegionId: regionID,
})
cancel()
if err != nil {
return err
}
if resp.Header.GetError() != nil {
return errors.Errorf("scatter region %d failed: %s", regionID, resp.Header.GetError().String())
}
return nil
}

func (c *client) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetOperator", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDuration.WithLabelValues("get_operator").Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, pdTimeout)
defer cancel()
return c.leaderClient().GetOperator(ctx, &pdpb.GetOperatorRequest{
Header: c.requestHeader(),
RegionId: regionID,
})
}

func (c *client) requestHeader() *pdpb.RequestHeader {
return &pdpb.RequestHeader{
ClusterId: c.clusterID,
Expand Down
132 changes: 106 additions & 26 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,45 @@ func TestClient(t *testing.T) {

var _ = Suite(&testClientSuite{})

type idAllocator struct {
allocator *core.MockIDAllocator
}

func (i *idAllocator) alloc() uint64 {
id, _ := i.allocator.Alloc()
return id
}

var (
regionIDAllocator = &core.MockIDAllocator{}
regionIDAllocator = &idAllocator{allocator: &core.MockIDAllocator{}}
// Note: IDs below are entirely arbitrary. They are only for checking
// whether GetRegion/GetStore works.
// If we alloc ID in client in the future, these IDs must be updated.
store = &metapb.Store{
Id: 1,
Address: "localhost",
stores = []*metapb.Store{
{Id: 1,
Address: "localhost:1",
},
{Id: 2,
Address: "localhost:2",
},
{Id: 3,
Address: "localhost:3",
},
{Id: 4,
Address: "localhost:4",
},
}
peer = &metapb.Peer{
Id: 2,
StoreId: store.GetId(),

peers = []*metapb.Peer{
{Id: regionIDAllocator.alloc(),
StoreId: stores[0].GetId(),
},
{Id: regionIDAllocator.alloc(),
StoreId: stores[1].GetId(),
},
{Id: regionIDAllocator.alloc(),
StoreId: stores[2].GetId(),
},
}
)

Expand All @@ -75,6 +102,11 @@ func (s *testClientSuite) SetUpSuite(c *C) {
c.Assert(err, IsNil)
s.regionHeartbeat, err = s.grpcPDClient.RegionHeartbeat(context.Background())
c.Assert(err, IsNil)
cluster := s.srv.GetRaftCluster()
c.Assert(cluster, NotNil)
for _, store := range stores {
s.srv.PutStore(context.Background(), &pdpb.PutStoreRequest{Header: newHeader(s.srv), Store: store})
}
}

func (s *testClientSuite) TearDownSuite(c *C) {
Expand Down Expand Up @@ -110,18 +142,18 @@ func newHeader(srv *server.Server) *pdpb.RequestHeader {
}

func bootstrapServer(c *C, header *pdpb.RequestHeader, client pdpb.PDClient) {
regionID, _ := regionIDAllocator.Alloc()
regionID := regionIDAllocator.alloc()
region := &metapb.Region{
Id: regionID,
RegionEpoch: &metapb.RegionEpoch{
ConfVer: 1,
Version: 1,
},
Peers: []*metapb.Peer{peer},
Peers: peers[:1],
}
req := &pdpb.BootstrapRequest{
Header: header,
Store: store,
Store: stores[0],
Region: region,
}
_, err := client.Bootstrap(context.Background(), req)
Expand Down Expand Up @@ -163,19 +195,19 @@ func (s *testClientSuite) TestTSORace(c *C) {
}

func (s *testClientSuite) TestGetRegion(c *C) {
regionID, _ := regionIDAllocator.Alloc()
regionID := regionIDAllocator.alloc()
region := &metapb.Region{
Id: regionID,
RegionEpoch: &metapb.RegionEpoch{
ConfVer: 1,
Version: 1,
},
Peers: []*metapb.Peer{peer},
Peers: peers,
}
req := &pdpb.RegionHeartbeatRequest{
Header: newHeader(s.srv),
Region: region,
Leader: peer,
Leader: peers[0],
}
err := s.regionHeartbeat.Send(req)
c.Assert(err, IsNil)
Expand All @@ -184,7 +216,7 @@ func (s *testClientSuite) TestGetRegion(c *C) {
r, leader, err := s.client.GetRegion(context.Background(), []byte("a"))
c.Assert(err, IsNil)
return c.Check(r, DeepEquals, region) &&
c.Check(leader, DeepEquals, peer)
c.Check(leader, DeepEquals, peers[0])
})
c.Succeed()
}
Expand All @@ -193,7 +225,7 @@ func (s *testClientSuite) TestGetPrevRegion(c *C) {
regionLen := 10
regions := make([]*metapb.Region, 0, regionLen)
for i := 0; i < regionLen; i++ {
regionID, _ := regionIDAllocator.Alloc()
regionID := regionIDAllocator.alloc()
r := &metapb.Region{
Id: regionID,
RegionEpoch: &metapb.RegionEpoch{
Expand All @@ -202,13 +234,13 @@ func (s *testClientSuite) TestGetPrevRegion(c *C) {
},
StartKey: []byte{byte(i)},
EndKey: []byte{byte(i + 1)},
Peers: []*metapb.Peer{peer},
Peers: peers,
}
regions = append(regions, r)
req := &pdpb.RegionHeartbeatRequest{
Header: newHeader(s.srv),
Region: r,
Leader: peer,
Leader: peers[0],
}
err := s.regionHeartbeat.Send(req)
c.Assert(err, IsNil)
Expand All @@ -218,7 +250,7 @@ func (s *testClientSuite) TestGetPrevRegion(c *C) {
r, leader, err := s.client.GetPrevRegion(context.Background(), []byte{byte(i)})
c.Assert(err, IsNil)
if i > 0 && i < regionLen {
return c.Check(leader, DeepEquals, peer) &&
return c.Check(leader, DeepEquals, peers[0]) &&
c.Check(r, DeepEquals, regions[i-1])
}
return c.Check(leader, IsNil) &&
Expand All @@ -229,19 +261,19 @@ func (s *testClientSuite) TestGetPrevRegion(c *C) {
}

func (s *testClientSuite) TestGetRegionByID(c *C) {
regionID, _ := regionIDAllocator.Alloc()
regionID := regionIDAllocator.alloc()
region := &metapb.Region{
Id: regionID,
RegionEpoch: &metapb.RegionEpoch{
ConfVer: 1,
Version: 1,
},
Peers: []*metapb.Peer{peer},
Peers: peers,
}
req := &pdpb.RegionHeartbeatRequest{
Header: newHeader(s.srv),
Region: region,
Leader: peer,
Leader: peers[0],
}
err := s.regionHeartbeat.Send(req)
c.Assert(err, IsNil)
Expand All @@ -250,14 +282,15 @@ func (s *testClientSuite) TestGetRegionByID(c *C) {
r, leader, err := s.client.GetRegionByID(context.Background(), regionID)
c.Assert(err, IsNil)
return c.Check(r, DeepEquals, region) &&
c.Check(leader, DeepEquals, peer)
c.Check(leader, DeepEquals, peers[0])
})
c.Succeed()
}

func (s *testClientSuite) TestGetStore(c *C) {
cluster := s.srv.GetRaftCluster()
c.Assert(cluster, NotNil)
store := stores[0]

// Get an up store should be OK.
n, err := s.client.GetStore(context.Background(), store.GetId())
Expand All @@ -266,7 +299,7 @@ func (s *testClientSuite) TestGetStore(c *C) {

stores, err := s.client.GetAllStores(context.Background())
c.Assert(err, IsNil)
c.Assert(stores, DeepEquals, []*metapb.Store{store})
c.Assert(stores, DeepEquals, stores)

// Mark the store as offline.
err = cluster.RemoveStore(store.GetId())
Expand All @@ -280,9 +313,16 @@ func (s *testClientSuite) TestGetStore(c *C) {
c.Assert(n, DeepEquals, offlineStore)

// Should return offline stores.
contains := false
stores, err = s.client.GetAllStores(context.Background())
c.Assert(err, IsNil)
c.Assert(stores, DeepEquals, []*metapb.Store{offlineStore})
for _, store := range stores {
if store.GetId() == offlineStore.GetId() {
contains = true
c.Assert(store, DeepEquals, offlineStore)
}
}
c.Assert(contains, IsTrue)

// Mark the store as tombstone.
err = cluster.BuryStore(store.GetId(), true)
Expand All @@ -296,14 +336,23 @@ func (s *testClientSuite) TestGetStore(c *C) {
c.Assert(n, IsNil)

// Should return tombstone stores.
contains = false
stores, err = s.client.GetAllStores(context.Background())
c.Assert(err, IsNil)
c.Assert(stores, DeepEquals, []*metapb.Store{tombstoneStore})
for _, store := range stores {
if store.GetId() == tombstoneStore.GetId() {
contains = true
c.Assert(store, DeepEquals, tombstoneStore)
}
}
c.Assert(contains, IsTrue)

// Should not return tombstone stores.
stores, err = s.client.GetAllStores(context.Background(), WithExcludeTombstone())
c.Assert(err, IsNil)
c.Assert(stores, IsNil)
for _, store := range stores {
c.Assert(store, Not(Equals), tombstoneStore)
}
}

func (s *testClientSuite) checkGCSafePoint(c *C, expectedSafePoint uint64) {
Expand All @@ -329,3 +378,34 @@ func (s *testClientSuite) TestUpdateGCSafePoint(c *C) {
c.Assert(err, IsNil)
s.checkGCSafePoint(c, math.MaxUint64)
}

func (s *testClientSuite) TestScatterRegion(c *C) {
regionID := regionIDAllocator.alloc()
region := &metapb.Region{
Id: regionID,
RegionEpoch: &metapb.RegionEpoch{
ConfVer: 1,
Version: 1,
},
Peers: peers,
}
req := &pdpb.RegionHeartbeatRequest{
Header: newHeader(s.srv),
Region: region,
Leader: peers[0],
}
err := s.regionHeartbeat.Send(req)
c.Assert(err, IsNil)
testutil.WaitUntil(c, func(c *C) bool {
err := s.client.ScatterRegion(context.Background(), regionID)
if c.Check(err, NotNil) {
return false
}
resp, err := s.client.GetOperator(context.Background(), regionID)
if c.Check(err, NotNil) {
return false
}
return c.Check(resp.GetRegionId(), Equals, regionID) && c.Check(string(resp.GetDesc()), Equals, "scatter-region") && c.Check(resp.GetStatus(), Equals, pdpb.OperatorStatus_RUNNING)
})
c.Succeed()
}

0 comments on commit fc997de

Please sign in to comment.