Skip to content

Commit

Permalink
tiny change
Browse files Browse the repository at this point in the history
Signed-off-by: rleungx <rleungx@gmail.com>
  • Loading branch information
rleungx committed Feb 13, 2019
1 parent 3150ede commit e68f2c8
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 39 deletions.
24 changes: 12 additions & 12 deletions server/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (s *testCoordinatorSuite) TestBasic(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.clusterInfo.getClusterID(), tc.kv, tc.clusterInfo)
hbStreams := newHeartbeatStreams(tc.clusterInfo.getClusterID(), tc.clusterInfo)
defer hbStreams.Close()

co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
Expand Down Expand Up @@ -201,7 +201,7 @@ func (s *testCoordinatorSuite) TestDispatch(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo)
hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.clusterInfo)
defer hbStreams.Close()

co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
Expand Down Expand Up @@ -266,7 +266,7 @@ func (s *testCoordinatorSuite) TestCollectMetrics(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo)
hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.clusterInfo)
defer hbStreams.Close()

co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
Expand All @@ -291,7 +291,7 @@ func (s *testCoordinatorSuite) TestCheckRegion(c *C) {
c.Assert(err, IsNil)
cfg.DisableLearner = false
tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo)
hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.clusterInfo)
defer hbStreams.Close()

co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
Expand Down Expand Up @@ -350,7 +350,7 @@ func (s *testCoordinatorSuite) TestReplica(c *C) {
cfg.RegionScheduleLimit = 0

tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo)
hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.clusterInfo)
defer hbStreams.Close()

co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
Expand Down Expand Up @@ -413,7 +413,7 @@ func (s *testCoordinatorSuite) TestPeerState(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo)
hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.clusterInfo)
defer hbStreams.Close()

co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
Expand Down Expand Up @@ -463,7 +463,7 @@ func (s *testCoordinatorSuite) TestShouldRun(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo)
hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.clusterInfo)
defer hbStreams.Close()

co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
Expand Down Expand Up @@ -515,7 +515,7 @@ func (s *testCoordinatorSuite) TestAddScheduler(c *C) {
cfg.ReplicaScheduleLimit = 0

tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo)
hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.clusterInfo)
defer hbStreams.Close()
co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
co.run()
Expand Down Expand Up @@ -574,7 +574,7 @@ func (s *testCoordinatorSuite) TestPersistScheduler(c *C) {
cfg.ReplicaScheduleLimit = 0

tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo)
hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.clusterInfo)
defer hbStreams.Close()

co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
Expand Down Expand Up @@ -668,7 +668,7 @@ func (s *testCoordinatorSuite) TestRestart(c *C) {
cfg.RegionScheduleLimit = 0

tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo)
hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.clusterInfo)
defer hbStreams.Close()

// Add 3 stores (1, 2, 3) and a region with 1 replica on store 1.
Expand Down Expand Up @@ -763,7 +763,7 @@ func (s *testScheduleControllerSuite) TestController(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo)
hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.clusterInfo)
defer hbStreams.Close()

c.Assert(tc.addLeaderRegion(1, 1), IsNil)
Expand Down Expand Up @@ -841,7 +841,7 @@ func (s *testScheduleControllerSuite) TestInterval(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo)
hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.clusterInfo)
defer hbStreams.Close()

co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
Expand Down
6 changes: 3 additions & 3 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,19 +355,19 @@ func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error {
region := core.RegionFromHeartbeat(request)
if region.GetID() == 0 {
msg := fmt.Sprintf("invalid request region, %v", request)
hbStreams.sendErr(region, pdpb.ErrorType_UNKNOWN, msg, storeAddress)
hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, storeAddress)
continue
}
if region.GetLeader() == nil {
msg := fmt.Sprintf("invalid request leader, %v", request)
hbStreams.sendErr(region, pdpb.ErrorType_UNKNOWN, msg, storeAddress)
hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, storeAddress)
continue
}

err = cluster.HandleRegionHeartbeat(region)
if err != nil {
msg := err.Error()
hbStreams.sendErr(region, pdpb.ErrorType_UNKNOWN, msg, storeAddress)
hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, storeAddress)
}

regionHeartbeatCounter.WithLabelValues(storeAddress, "report", "ok").Inc()
Expand Down
1 change: 0 additions & 1 deletion server/heartbeat_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ func (s *testHeartbeatStreamSuite) TestActivity(c *C) {
return 0
}
}

req := &pdpb.RegionHeartbeatRequest{
Header: newRequestHeader(s.svr.clusterID),
Leader: s.region.Peers[0],
Expand Down
27 changes: 5 additions & 22 deletions server/heartbeat_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"sync"
"time"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/pd/pkg/logutil"
"github.com/pingcap/pd/server/core"
Expand All @@ -44,11 +43,10 @@ type heartbeatStreams struct {
streams map[uint64]heartbeatStream
msgCh chan *pdpb.RegionHeartbeatResponse
streamCh chan streamUpdate
kv *core.KV
cluster *clusterInfo
}

func newHeartbeatStreams(clusterID uint64, kv *core.KV, cluster *clusterInfo) *heartbeatStreams {
func newHeartbeatStreams(clusterID uint64, cluster *clusterInfo) *heartbeatStreams {
ctx, cancel := context.WithCancel(context.Background())
hs := &heartbeatStreams{
ctx: ctx,
Expand All @@ -57,7 +55,6 @@ func newHeartbeatStreams(clusterID uint64, kv *core.KV, cluster *clusterInfo) *h
streams: make(map[uint64]heartbeatStream),
msgCh: make(chan *pdpb.RegionHeartbeatResponse, regionheartbeatSendChanCap),
streamCh: make(chan streamUpdate, 1),
kv: kv,
cluster: cluster,
}
hs.wg.Add(1)
Expand All @@ -80,17 +77,10 @@ func (s *heartbeatStreams) run() {
case update := <-s.streamCh:
s.streams[update.storeID] = update.stream
case msg := <-s.msgCh:
var storeAddress string
store := &metapb.Store{}
storeID := msg.GetTargetPeer().GetStoreId()
var storeAddress string
if s.cluster == nil {
ok, err := s.kv.LoadStore(storeID, store)
if err != nil {
log.Errorf("[region %v] failed to load store %v: %v", msg.RegionId, storeID, err)
}
if ok {
storeAddress = store.GetAddress()
}
continue
} else {
storeAddress = s.cluster.GetStore(storeID).GetAddress()
}
Expand All @@ -109,15 +99,8 @@ func (s *heartbeatStreams) run() {
case <-keepAliveTicker.C:
for storeID, stream := range s.streams {
var storeAddress string
store := &metapb.Store{}
if s.cluster == nil {
ok, err := s.kv.LoadStore(storeID, store)
if err != nil {
log.Errorf("[store %v] failed to load store: %v", storeID, err)
}
if ok {
storeAddress = store.GetAddress()
}
continue
} else {
storeAddress = s.cluster.GetStore(storeID).GetAddress()
}
Expand Down Expand Up @@ -167,7 +150,7 @@ func (s *heartbeatStreams) SendMsg(region *core.RegionInfo, msg *pdpb.RegionHear
}
}

func (s *heartbeatStreams) sendErr(region *core.RegionInfo, errType pdpb.ErrorType, errMsg string, storeAddress string) {
func (s *heartbeatStreams) sendErr(errType pdpb.ErrorType, errMsg string, storeAddress string) {
regionHeartbeatCounter.WithLabelValues(storeAddress, "report", "err").Inc()

msg := &pdpb.RegionHeartbeatResponse{
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (s *Server) startServer() error {
}
s.kv = core.NewKV(kvBase).SetRegionKV(regionKV)
s.cluster = newRaftCluster(s, s.clusterID)
s.hbStreams = newHeartbeatStreams(s.clusterID, s.kv, s.cluster.cachedCluster)
s.hbStreams = newHeartbeatStreams(s.clusterID, s.cluster.cachedCluster)
if s.classifier, err = namespace.CreateClassifier(s.cfg.NamespaceClassifier, s.kv, s.idAlloc); err != nil {
return err
}
Expand Down

0 comments on commit e68f2c8

Please sign in to comment.