Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: supports scatter region and get operator status #1501

Merged
merged 5 commits into from
Apr 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ type Client interface {
// If the given safePoint is less than the current one, it will not be updated.
// Returns the new safePoint after updating.
UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint64, error)
// ScatterRegion scatters the specified region. Should use it for a batch of regions,
// and the distribution of these regions will be dispersed.
ScatterRegion(ctx context.Context, regionID uint64) error
// GetOperator gets the status of operator of the specified region.
GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error)
// Close closes the client.
Close()
}
Expand Down Expand Up @@ -516,6 +521,7 @@ func (c *client) Close() {
}
}

// leaderClient gets the client of current PD leader.
func (c *client) leaderClient() pdpb.PDClient {
c.connMu.RLock()
defer c.connMu.RUnlock()
Expand Down Expand Up @@ -752,6 +758,45 @@ func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint6
return resp.GetNewSafePoint(), nil
}

func (c *client) ScatterRegion(ctx context.Context, regionID uint64) error {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.ScatterRegion", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDuration.WithLabelValues("scatter_region").Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, pdTimeout)
resp, err := c.leaderClient().ScatterRegion(ctx, &pdpb.ScatterRegionRequest{
Header: c.requestHeader(),
RegionId: regionID,
})
cancel()
if err != nil {
return err
}
if resp.Header.GetError() != nil {
return errors.Errorf("scatter region %d failed: %s", regionID, resp.Header.GetError().String())
}
return nil
}

func (c *client) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetOperator", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDuration.WithLabelValues("get_operator").Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, pdTimeout)
defer cancel()
return c.leaderClient().GetOperator(ctx, &pdpb.GetOperatorRequest{
Header: c.requestHeader(),
RegionId: regionID,
})
}

func (c *client) requestHeader() *pdpb.RequestHeader {
return &pdpb.RequestHeader{
ClusterId: c.clusterID,
Expand Down
132 changes: 106 additions & 26 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,45 @@ func TestClient(t *testing.T) {

var _ = Suite(&testClientSuite{})

type idAllocator struct {
allocator *core.MockIDAllocator
}

func (i *idAllocator) alloc() uint64 {
id, _ := i.allocator.Alloc()
return id
}

var (
regionIDAllocator = &core.MockIDAllocator{}
regionIDAllocator = &idAllocator{allocator: &core.MockIDAllocator{}}
// Note: IDs below are entirely arbitrary. They are only for checking
// whether GetRegion/GetStore works.
// If we alloc ID in client in the future, these IDs must be updated.
store = &metapb.Store{
Id: 1,
Address: "localhost",
stores = []*metapb.Store{
{Id: 1,
Address: "localhost:1",
},
{Id: 2,
Address: "localhost:2",
},
{Id: 3,
Address: "localhost:3",
},
{Id: 4,
Address: "localhost:4",
},
}
peer = &metapb.Peer{
Id: 2,
StoreId: store.GetId(),

peers = []*metapb.Peer{
{Id: regionIDAllocator.alloc(),
StoreId: stores[0].GetId(),
},
{Id: regionIDAllocator.alloc(),
StoreId: stores[1].GetId(),
},
{Id: regionIDAllocator.alloc(),
StoreId: stores[2].GetId(),
},
}
)

Expand All @@ -75,6 +102,11 @@ func (s *testClientSuite) SetUpSuite(c *C) {
c.Assert(err, IsNil)
s.regionHeartbeat, err = s.grpcPDClient.RegionHeartbeat(context.Background())
c.Assert(err, IsNil)
cluster := s.srv.GetRaftCluster()
c.Assert(cluster, NotNil)
for _, store := range stores {
s.srv.PutStore(context.Background(), &pdpb.PutStoreRequest{Header: newHeader(s.srv), Store: store})
}
}

func (s *testClientSuite) TearDownSuite(c *C) {
Expand Down Expand Up @@ -110,18 +142,18 @@ func newHeader(srv *server.Server) *pdpb.RequestHeader {
}

func bootstrapServer(c *C, header *pdpb.RequestHeader, client pdpb.PDClient) {
regionID, _ := regionIDAllocator.Alloc()
regionID := regionIDAllocator.alloc()
region := &metapb.Region{
Id: regionID,
RegionEpoch: &metapb.RegionEpoch{
ConfVer: 1,
Version: 1,
},
Peers: []*metapb.Peer{peer},
Peers: peers[:1],
}
req := &pdpb.BootstrapRequest{
Header: header,
Store: store,
Store: stores[0],
Region: region,
}
_, err := client.Bootstrap(context.Background(), req)
Expand Down Expand Up @@ -163,19 +195,19 @@ func (s *testClientSuite) TestTSORace(c *C) {
}

func (s *testClientSuite) TestGetRegion(c *C) {
regionID, _ := regionIDAllocator.Alloc()
regionID := regionIDAllocator.alloc()
region := &metapb.Region{
Id: regionID,
RegionEpoch: &metapb.RegionEpoch{
ConfVer: 1,
Version: 1,
},
Peers: []*metapb.Peer{peer},
Peers: peers,
}
req := &pdpb.RegionHeartbeatRequest{
Header: newHeader(s.srv),
Region: region,
Leader: peer,
Leader: peers[0],
}
err := s.regionHeartbeat.Send(req)
c.Assert(err, IsNil)
Expand All @@ -184,7 +216,7 @@ func (s *testClientSuite) TestGetRegion(c *C) {
r, leader, err := s.client.GetRegion(context.Background(), []byte("a"))
c.Assert(err, IsNil)
return c.Check(r, DeepEquals, region) &&
c.Check(leader, DeepEquals, peer)
c.Check(leader, DeepEquals, peers[0])
})
c.Succeed()
}
Expand All @@ -193,7 +225,7 @@ func (s *testClientSuite) TestGetPrevRegion(c *C) {
regionLen := 10
regions := make([]*metapb.Region, 0, regionLen)
for i := 0; i < regionLen; i++ {
regionID, _ := regionIDAllocator.Alloc()
regionID := regionIDAllocator.alloc()
r := &metapb.Region{
Id: regionID,
RegionEpoch: &metapb.RegionEpoch{
Expand All @@ -202,13 +234,13 @@ func (s *testClientSuite) TestGetPrevRegion(c *C) {
},
StartKey: []byte{byte(i)},
EndKey: []byte{byte(i + 1)},
Peers: []*metapb.Peer{peer},
Peers: peers,
}
regions = append(regions, r)
req := &pdpb.RegionHeartbeatRequest{
Header: newHeader(s.srv),
Region: r,
Leader: peer,
Leader: peers[0],
}
err := s.regionHeartbeat.Send(req)
c.Assert(err, IsNil)
Expand All @@ -218,7 +250,7 @@ func (s *testClientSuite) TestGetPrevRegion(c *C) {
r, leader, err := s.client.GetPrevRegion(context.Background(), []byte{byte(i)})
c.Assert(err, IsNil)
if i > 0 && i < regionLen {
return c.Check(leader, DeepEquals, peer) &&
return c.Check(leader, DeepEquals, peers[0]) &&
c.Check(r, DeepEquals, regions[i-1])
}
return c.Check(leader, IsNil) &&
Expand All @@ -229,19 +261,19 @@ func (s *testClientSuite) TestGetPrevRegion(c *C) {
}

func (s *testClientSuite) TestGetRegionByID(c *C) {
regionID, _ := regionIDAllocator.Alloc()
regionID := regionIDAllocator.alloc()
region := &metapb.Region{
Id: regionID,
RegionEpoch: &metapb.RegionEpoch{
ConfVer: 1,
Version: 1,
},
Peers: []*metapb.Peer{peer},
Peers: peers,
}
req := &pdpb.RegionHeartbeatRequest{
Header: newHeader(s.srv),
Region: region,
Leader: peer,
Leader: peers[0],
}
err := s.regionHeartbeat.Send(req)
c.Assert(err, IsNil)
Expand All @@ -250,14 +282,15 @@ func (s *testClientSuite) TestGetRegionByID(c *C) {
r, leader, err := s.client.GetRegionByID(context.Background(), regionID)
c.Assert(err, IsNil)
return c.Check(r, DeepEquals, region) &&
c.Check(leader, DeepEquals, peer)
c.Check(leader, DeepEquals, peers[0])
})
c.Succeed()
}

func (s *testClientSuite) TestGetStore(c *C) {
cluster := s.srv.GetRaftCluster()
c.Assert(cluster, NotNil)
store := stores[0]

// Get an up store should be OK.
n, err := s.client.GetStore(context.Background(), store.GetId())
Expand All @@ -266,7 +299,7 @@ func (s *testClientSuite) TestGetStore(c *C) {

stores, err := s.client.GetAllStores(context.Background())
c.Assert(err, IsNil)
c.Assert(stores, DeepEquals, []*metapb.Store{store})
c.Assert(stores, DeepEquals, stores)

// Mark the store as offline.
err = cluster.RemoveStore(store.GetId())
Expand All @@ -280,9 +313,16 @@ func (s *testClientSuite) TestGetStore(c *C) {
c.Assert(n, DeepEquals, offlineStore)

// Should return offline stores.
contains := false
stores, err = s.client.GetAllStores(context.Background())
c.Assert(err, IsNil)
c.Assert(stores, DeepEquals, []*metapb.Store{offlineStore})
for _, store := range stores {
if store.GetId() == offlineStore.GetId() {
contains = true
c.Assert(store, DeepEquals, offlineStore)
}
}
c.Assert(contains, IsTrue)

// Mark the store as tombstone.
err = cluster.BuryStore(store.GetId(), true)
Expand All @@ -296,14 +336,23 @@ func (s *testClientSuite) TestGetStore(c *C) {
c.Assert(n, IsNil)

// Should return tombstone stores.
contains = false
stores, err = s.client.GetAllStores(context.Background())
c.Assert(err, IsNil)
c.Assert(stores, DeepEquals, []*metapb.Store{tombstoneStore})
for _, store := range stores {
if store.GetId() == tombstoneStore.GetId() {
contains = true
c.Assert(store, DeepEquals, tombstoneStore)
}
}
c.Assert(contains, IsTrue)

// Should not return tombstone stores.
stores, err = s.client.GetAllStores(context.Background(), WithExcludeTombstone())
c.Assert(err, IsNil)
c.Assert(stores, IsNil)
for _, store := range stores {
c.Assert(store, Not(Equals), tombstoneStore)
}
}

func (s *testClientSuite) checkGCSafePoint(c *C, expectedSafePoint uint64) {
Expand All @@ -329,3 +378,34 @@ func (s *testClientSuite) TestUpdateGCSafePoint(c *C) {
c.Assert(err, IsNil)
s.checkGCSafePoint(c, math.MaxUint64)
}

func (s *testClientSuite) TestScatterRegion(c *C) {
regionID := regionIDAllocator.alloc()
region := &metapb.Region{
Id: regionID,
RegionEpoch: &metapb.RegionEpoch{
ConfVer: 1,
Version: 1,
},
Peers: peers,
}
req := &pdpb.RegionHeartbeatRequest{
Header: newHeader(s.srv),
Region: region,
Leader: peers[0],
}
err := s.regionHeartbeat.Send(req)
c.Assert(err, IsNil)
testutil.WaitUntil(c, func(c *C) bool {
err := s.client.ScatterRegion(context.Background(), regionID)
if c.Check(err, NotNil) {
return false
}
resp, err := s.client.GetOperator(context.Background(), regionID)
if c.Check(err, NotNil) {
return false
}
return c.Check(resp.GetRegionId(), Equals, regionID) && c.Check(string(resp.GetDesc()), Equals, "scatter-region") && c.Check(resp.GetStatus(), Equals, pdpb.OperatorStatus_RUNNING)
})
c.Succeed()
}