diff --git a/client/client.go b/client/client.go index ebb6536f861..436e68c4de7 100644 --- a/client/client.go +++ b/client/client.go @@ -64,6 +64,11 @@ type Client interface { // If the given safePoint is less than the current one, it will not be updated. // Returns the new safePoint after updating. UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint64, error) + // ScatterRegion scatters the specified region. Should use it for a batch of regions, + // and the distribution of these regions will be dispersed. + ScatterRegion(ctx context.Context, regionID uint64) error + // GetOperator gets the status of operator of the specified region. + GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) // Close closes the client. Close() } @@ -516,6 +521,7 @@ func (c *client) Close() { } } +// leaderClient gets the client of current PD leader. func (c *client) leaderClient() pdpb.PDClient { c.connMu.RLock() defer c.connMu.RUnlock() @@ -752,6 +758,45 @@ func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint6 return resp.GetNewSafePoint(), nil } +func (c *client) ScatterRegion(ctx context.Context, regionID uint64) error { + if span := opentracing.SpanFromContext(ctx); span != nil { + span = opentracing.StartSpan("pdclient.ScatterRegion", opentracing.ChildOf(span.Context())) + defer span.Finish() + } + start := time.Now() + defer func() { cmdDuration.WithLabelValues("scatter_region").Observe(time.Since(start).Seconds()) }() + + ctx, cancel := context.WithTimeout(ctx, pdTimeout) + resp, err := c.leaderClient().ScatterRegion(ctx, &pdpb.ScatterRegionRequest{ + Header: c.requestHeader(), + RegionId: regionID, + }) + cancel() + if err != nil { + return err + } + if resp.Header.GetError() != nil { + return errors.Errorf("scatter region %d failed: %s", regionID, resp.Header.GetError().String()) + } + return nil +} + +func (c *client) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) { + if span := opentracing.SpanFromContext(ctx); span != nil { + span = opentracing.StartSpan("pdclient.GetOperator", opentracing.ChildOf(span.Context())) + defer span.Finish() + } + start := time.Now() + defer func() { cmdDuration.WithLabelValues("get_operator").Observe(time.Since(start).Seconds()) }() + + ctx, cancel := context.WithTimeout(ctx, pdTimeout) + defer cancel() + return c.leaderClient().GetOperator(ctx, &pdpb.GetOperatorRequest{ + Header: c.requestHeader(), + RegionId: regionID, + }) +} + func (c *client) requestHeader() *pdpb.RequestHeader { return &pdpb.RequestHeader{ ClusterId: c.clusterID, diff --git a/client/client_test.go b/client/client_test.go index 027c65f3fe6..03fa97b036b 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -38,18 +38,45 @@ func TestClient(t *testing.T) { var _ = Suite(&testClientSuite{}) +type idAllocator struct { + allocator *core.MockIDAllocator +} + +func (i *idAllocator) alloc() uint64 { + id, _ := i.allocator.Alloc() + return id +} + var ( - regionIDAllocator = &core.MockIDAllocator{} + regionIDAllocator = &idAllocator{allocator: &core.MockIDAllocator{}} // Note: IDs below are entirely arbitrary. They are only for checking // whether GetRegion/GetStore works. // If we alloc ID in client in the future, these IDs must be updated. - store = &metapb.Store{ - Id: 1, - Address: "localhost", + stores = []*metapb.Store{ + {Id: 1, + Address: "localhost:1", + }, + {Id: 2, + Address: "localhost:2", + }, + {Id: 3, + Address: "localhost:3", + }, + {Id: 4, + Address: "localhost:4", + }, } - peer = &metapb.Peer{ - Id: 2, - StoreId: store.GetId(), + + peers = []*metapb.Peer{ + {Id: regionIDAllocator.alloc(), + StoreId: stores[0].GetId(), + }, + {Id: regionIDAllocator.alloc(), + StoreId: stores[1].GetId(), + }, + {Id: regionIDAllocator.alloc(), + StoreId: stores[2].GetId(), + }, } ) @@ -75,6 +102,11 @@ func (s *testClientSuite) SetUpSuite(c *C) { c.Assert(err, IsNil) s.regionHeartbeat, err = s.grpcPDClient.RegionHeartbeat(context.Background()) c.Assert(err, IsNil) + cluster := s.srv.GetRaftCluster() + c.Assert(cluster, NotNil) + for _, store := range stores { + s.srv.PutStore(context.Background(), &pdpb.PutStoreRequest{Header: newHeader(s.srv), Store: store}) + } } func (s *testClientSuite) TearDownSuite(c *C) { @@ -110,18 +142,18 @@ func newHeader(srv *server.Server) *pdpb.RequestHeader { } func bootstrapServer(c *C, header *pdpb.RequestHeader, client pdpb.PDClient) { - regionID, _ := regionIDAllocator.Alloc() + regionID := regionIDAllocator.alloc() region := &metapb.Region{ Id: regionID, RegionEpoch: &metapb.RegionEpoch{ ConfVer: 1, Version: 1, }, - Peers: []*metapb.Peer{peer}, + Peers: peers[:1], } req := &pdpb.BootstrapRequest{ Header: header, - Store: store, + Store: stores[0], Region: region, } _, err := client.Bootstrap(context.Background(), req) @@ -163,19 +195,19 @@ func (s *testClientSuite) TestTSORace(c *C) { } func (s *testClientSuite) TestGetRegion(c *C) { - regionID, _ := regionIDAllocator.Alloc() + regionID := regionIDAllocator.alloc() region := &metapb.Region{ Id: regionID, RegionEpoch: &metapb.RegionEpoch{ ConfVer: 1, Version: 1, }, - Peers: []*metapb.Peer{peer}, + Peers: peers, } req := &pdpb.RegionHeartbeatRequest{ Header: newHeader(s.srv), Region: region, - Leader: peer, + Leader: peers[0], } err := s.regionHeartbeat.Send(req) c.Assert(err, IsNil) @@ -184,7 +216,7 @@ func (s *testClientSuite) TestGetRegion(c *C) { r, leader, err := s.client.GetRegion(context.Background(), []byte("a")) c.Assert(err, IsNil) return c.Check(r, DeepEquals, region) && - c.Check(leader, DeepEquals, peer) + c.Check(leader, DeepEquals, peers[0]) }) c.Succeed() } @@ -193,7 +225,7 @@ func (s *testClientSuite) TestGetPrevRegion(c *C) { regionLen := 10 regions := make([]*metapb.Region, 0, regionLen) for i := 0; i < regionLen; i++ { - regionID, _ := regionIDAllocator.Alloc() + regionID := regionIDAllocator.alloc() r := &metapb.Region{ Id: regionID, RegionEpoch: &metapb.RegionEpoch{ @@ -202,13 +234,13 @@ func (s *testClientSuite) TestGetPrevRegion(c *C) { }, StartKey: []byte{byte(i)}, EndKey: []byte{byte(i + 1)}, - Peers: []*metapb.Peer{peer}, + Peers: peers, } regions = append(regions, r) req := &pdpb.RegionHeartbeatRequest{ Header: newHeader(s.srv), Region: r, - Leader: peer, + Leader: peers[0], } err := s.regionHeartbeat.Send(req) c.Assert(err, IsNil) @@ -218,7 +250,7 @@ func (s *testClientSuite) TestGetPrevRegion(c *C) { r, leader, err := s.client.GetPrevRegion(context.Background(), []byte{byte(i)}) c.Assert(err, IsNil) if i > 0 && i < regionLen { - return c.Check(leader, DeepEquals, peer) && + return c.Check(leader, DeepEquals, peers[0]) && c.Check(r, DeepEquals, regions[i-1]) } return c.Check(leader, IsNil) && @@ -229,19 +261,19 @@ func (s *testClientSuite) TestGetPrevRegion(c *C) { } func (s *testClientSuite) TestGetRegionByID(c *C) { - regionID, _ := regionIDAllocator.Alloc() + regionID := regionIDAllocator.alloc() region := &metapb.Region{ Id: regionID, RegionEpoch: &metapb.RegionEpoch{ ConfVer: 1, Version: 1, }, - Peers: []*metapb.Peer{peer}, + Peers: peers, } req := &pdpb.RegionHeartbeatRequest{ Header: newHeader(s.srv), Region: region, - Leader: peer, + Leader: peers[0], } err := s.regionHeartbeat.Send(req) c.Assert(err, IsNil) @@ -250,7 +282,7 @@ func (s *testClientSuite) TestGetRegionByID(c *C) { r, leader, err := s.client.GetRegionByID(context.Background(), regionID) c.Assert(err, IsNil) return c.Check(r, DeepEquals, region) && - c.Check(leader, DeepEquals, peer) + c.Check(leader, DeepEquals, peers[0]) }) c.Succeed() } @@ -258,6 +290,7 @@ func (s *testClientSuite) TestGetRegionByID(c *C) { func (s *testClientSuite) TestGetStore(c *C) { cluster := s.srv.GetRaftCluster() c.Assert(cluster, NotNil) + store := stores[0] // Get an up store should be OK. n, err := s.client.GetStore(context.Background(), store.GetId()) @@ -266,7 +299,7 @@ func (s *testClientSuite) TestGetStore(c *C) { stores, err := s.client.GetAllStores(context.Background()) c.Assert(err, IsNil) - c.Assert(stores, DeepEquals, []*metapb.Store{store}) + c.Assert(stores, DeepEquals, stores) // Mark the store as offline. err = cluster.RemoveStore(store.GetId()) @@ -280,9 +313,16 @@ func (s *testClientSuite) TestGetStore(c *C) { c.Assert(n, DeepEquals, offlineStore) // Should return offline stores. + contains := false stores, err = s.client.GetAllStores(context.Background()) c.Assert(err, IsNil) - c.Assert(stores, DeepEquals, []*metapb.Store{offlineStore}) + for _, store := range stores { + if store.GetId() == offlineStore.GetId() { + contains = true + c.Assert(store, DeepEquals, offlineStore) + } + } + c.Assert(contains, IsTrue) // Mark the store as tombstone. err = cluster.BuryStore(store.GetId(), true) @@ -296,14 +336,23 @@ func (s *testClientSuite) TestGetStore(c *C) { c.Assert(n, IsNil) // Should return tombstone stores. + contains = false stores, err = s.client.GetAllStores(context.Background()) c.Assert(err, IsNil) - c.Assert(stores, DeepEquals, []*metapb.Store{tombstoneStore}) + for _, store := range stores { + if store.GetId() == tombstoneStore.GetId() { + contains = true + c.Assert(store, DeepEquals, tombstoneStore) + } + } + c.Assert(contains, IsTrue) // Should not return tombstone stores. stores, err = s.client.GetAllStores(context.Background(), WithExcludeTombstone()) c.Assert(err, IsNil) - c.Assert(stores, IsNil) + for _, store := range stores { + c.Assert(store, Not(Equals), tombstoneStore) + } } func (s *testClientSuite) checkGCSafePoint(c *C, expectedSafePoint uint64) { @@ -329,3 +378,34 @@ func (s *testClientSuite) TestUpdateGCSafePoint(c *C) { c.Assert(err, IsNil) s.checkGCSafePoint(c, math.MaxUint64) } + +func (s *testClientSuite) TestScatterRegion(c *C) { + regionID := regionIDAllocator.alloc() + region := &metapb.Region{ + Id: regionID, + RegionEpoch: &metapb.RegionEpoch{ + ConfVer: 1, + Version: 1, + }, + Peers: peers, + } + req := &pdpb.RegionHeartbeatRequest{ + Header: newHeader(s.srv), + Region: region, + Leader: peers[0], + } + err := s.regionHeartbeat.Send(req) + c.Assert(err, IsNil) + testutil.WaitUntil(c, func(c *C) bool { + err := s.client.ScatterRegion(context.Background(), regionID) + if c.Check(err, NotNil) { + return false + } + resp, err := s.client.GetOperator(context.Background(), regionID) + if c.Check(err, NotNil) { + return false + } + return c.Check(resp.GetRegionId(), Equals, regionID) && c.Check(string(resp.GetDesc()), Equals, "scatter-region") && c.Check(resp.GetStatus(), Equals, pdpb.OperatorStatus_RUNNING) + }) + c.Succeed() +}