Skip to content

Commit

Permalink
*: add GetOperator service (#1477)
Browse files Browse the repository at this point in the history
* *: add GetOperator service

Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch authored Apr 8, 2019
1 parent cffb759 commit 9ad0db5
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 59 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9
github.com/pingcap/errors v0.10.1 // indirect
github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3
github.com/pingcap/kvproto v0.0.0-20190225084405-84f2c621d8e8
github.com/pingcap/kvproto v0.0.0-20190327032727-3d8cb3a30d5d
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v0.8.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ github.com/pingcap/errors v0.10.1 h1:fGVuPMtwNcxbzQ3aoRyyi6kxvXKMkEsceP81f3b8wsk
github.com/pingcap/errors v0.10.1/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3 h1:04yuCf5NMvLU8rB2m4Qs3rynH7EYpMno3lHkewIOdMo=
github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3/go.mod h1:DazNTg0PTldtpsQiT9I5tVJwV1onHMKBBgXzmJUlMns=
github.com/pingcap/kvproto v0.0.0-20190225084405-84f2c621d8e8 h1:ZoP49RWRjlmvXUWAySYZD1tV8BIVVEJ7xrbCg1B7/fw=
github.com/pingcap/kvproto v0.0.0-20190225084405-84f2c621d8e8/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190327032727-3d8cb3a30d5d h1:LJYJl+cBhkkTWD79n+n9Bp4agQ85SdF9YKY4zEtL9Kw=
github.com/pingcap/kvproto v0.0.0-20190327032727-3d8cb3a30d5d/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7 h1:kOHAMalwF69bJrtWrOdVaCSvZjLucrJhP4NQKIu6uM4=
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
32 changes: 32 additions & 0 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ func (s *Server) GetStore(ctx context.Context, request *pdpb.GetStoreRequest) (*
return &pdpb.GetStoreResponse{
Header: s.header(),
Store: store.GetMeta(),
Stats: store.GetStoreStats(),
}, nil
}

Expand Down Expand Up @@ -688,6 +689,37 @@ func (s *Server) UpdateGCSafePoint(ctx context.Context, request *pdpb.UpdateGCSa
}, nil
}

// GetOperator gets information about the operator belonging to the speicfy region.
func (s *Server) GetOperator(ctx context.Context, request *pdpb.GetOperatorRequest) (*pdpb.GetOperatorResponse, error) {
if err := s.validateRequest(request.GetHeader()); err != nil {
return nil, err
}

cluster := s.GetRaftCluster()
if cluster == nil {
return &pdpb.GetOperatorResponse{Header: s.notBootstrappedHeader()}, nil
}

opController := cluster.coordinator.opController
requestID := request.GetRegionId()
r := opController.GetOperatorStatus(requestID)
if r == nil {
header := s.errorHeader(&pdpb.Error{
Type: pdpb.ErrorType_REGION_NOT_FOUND,
Message: "Not Found",
})
return &pdpb.GetOperatorResponse{Header: header}, nil
}

return &pdpb.GetOperatorResponse{
Header: s.header(),
RegionId: requestID,
Desc: []byte(r.Op.Desc()),
Kind: []byte(r.Op.Kind().String()),
Status: r.Status,
}, nil
}

// validateRequest checks if Server is leader and clusterID is matched.
// TODO: Call it in gRPC intercepter.
func (s *Server) validateRequest(header *pdpb.RequestHeader) error {
Expand Down
96 changes: 51 additions & 45 deletions server/schedule/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,55 +376,61 @@ func (mc *MockCluster) newMockRegionInfo(regionID uint64, leaderID uint64, follo
return core.NewRegionInfo(region, leader)
}

// ApplyOperator mocks apply oeprator.
// ApplyOperatorStep mocks apply operator step.
func (mc *MockCluster) ApplyOperatorStep(region *core.RegionInfo, op *Operator) *core.RegionInfo {
if step := op.Check(region); step != nil {
switch s := step.(type) {
case TransferLeader:
region = region.Clone(core.WithLeader(region.GetStorePeer(s.ToStore)))
case AddPeer:
if region.GetStorePeer(s.ToStore) != nil {
panic("Add peer that exists")
}
peer := &metapb.Peer{
Id: s.PeerID,
StoreId: s.ToStore,
}
region = region.Clone(core.WithAddPeer(peer))
case RemovePeer:
if region.GetStorePeer(s.FromStore) == nil {
panic("Remove peer that doesn't exist")
}
if region.GetLeader().GetStoreId() == s.FromStore {
panic("Cannot remove the leader peer")
}
region = region.Clone(core.WithRemoveStorePeer(s.FromStore))
case AddLearner:
if region.GetStorePeer(s.ToStore) != nil {
panic("Add learner that exists")
}
peer := &metapb.Peer{
Id: s.PeerID,
StoreId: s.ToStore,
IsLearner: true,
}
region = region.Clone(core.WithAddPeer(peer))
case PromoteLearner:
if region.GetStoreLearner(s.ToStore) == nil {
panic("Promote peer that doesn't exist")
}
peer := &metapb.Peer{
Id: s.PeerID,
StoreId: s.ToStore,
}
region = region.Clone(core.WithRemoveStorePeer(s.ToStore), core.WithAddPeer(peer))
default:
panic("Unknown operator step")
}
}
return region
}

// ApplyOperator mocks apply operator.
func (mc *MockCluster) ApplyOperator(op *Operator) {
origin := mc.GetRegion(op.RegionID())
region := origin
for !op.IsFinish() {
if step := op.Check(region); step != nil {
switch s := step.(type) {
case TransferLeader:
region = region.Clone(core.WithLeader(region.GetStorePeer(s.ToStore)))
case AddPeer:
if region.GetStorePeer(s.ToStore) != nil {
panic("Add peer that exists")
}
peer := &metapb.Peer{
Id: s.PeerID,
StoreId: s.ToStore,
}
region = region.Clone(core.WithAddPeer(peer))
case RemovePeer:
if region.GetStorePeer(s.FromStore) == nil {
panic("Remove peer that doesn't exist")
}
if region.GetLeader().GetStoreId() == s.FromStore {
panic("Cannot remove the leader peer")
}
region = region.Clone(core.WithRemoveStorePeer(s.FromStore))
case AddLearner:
if region.GetStorePeer(s.ToStore) != nil {
panic("Add learner that exists")
}
peer := &metapb.Peer{
Id: s.PeerID,
StoreId: s.ToStore,
IsLearner: true,
}
region = region.Clone(core.WithAddPeer(peer))
case PromoteLearner:
if region.GetStoreLearner(s.ToStore) == nil {
panic("promote peer that doesn't exist")
}
peer := &metapb.Peer{
Id: s.PeerID,
StoreId: s.ToStore,
}
region = region.Clone(core.WithRemoveStorePeer(s.ToStore), core.WithAddPeer(peer))
default:
panic("Unknown operator step")
}
}
region = mc.ApplyOperatorStep(region, op)
}
mc.PutRegion(region)
for id := range region.GetStoreIds() {
Expand Down
61 changes: 60 additions & 1 deletion server/schedule/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
log "github.com/pingcap/log"
"github.com/pingcap/pd/server/cache"
"github.com/pingcap/pd/server/core"
"go.uber.org/zap"
)
Expand All @@ -41,6 +42,7 @@ type OperatorController struct {
hbStreams HeartbeatStreams
histories *list.List
counts map[OperatorKind]uint64
opRecords *OperatorRecords
}

// NewOperatorController creates a OperatorController.
Expand All @@ -51,6 +53,7 @@ func NewOperatorController(cluster Cluster, hbStreams HeartbeatStreams) *Operato
hbStreams: hbStreams,
histories: list.New(),
counts: make(map[OperatorKind]uint64),
opRecords: NewOperatorRecords(),
}
}

Expand All @@ -69,10 +72,12 @@ func (oc *OperatorController) Dispatch(region *core.RegionInfo) {
operatorCounter.WithLabelValues(op.Desc(), "finish").Inc()
operatorDuration.WithLabelValues(op.Desc()).Observe(op.ElapsedTime().Seconds())
oc.pushHistory(op)
oc.opRecords.Put(op, pdpb.OperatorStatus_SUCCESS)
oc.RemoveOperator(op)
} else if timeout {
log.Info("operator timeout", zap.Uint64("region-id", region.GetID()), zap.Reflect("operator", op))
oc.RemoveOperator(op)
oc.opRecords.Put(op, pdpb.OperatorStatus_TIMEOUT)
}
}
}
Expand All @@ -85,6 +90,7 @@ func (oc *OperatorController) AddOperator(ops ...*Operator) bool {
for _, op := range ops {
if !oc.checkAddOperator(op) {
operatorCounter.WithLabelValues(op.Desc(), "canceled").Inc()
oc.opRecords.Put(op, pdpb.OperatorStatus_CANCEL)
return false
}
}
Expand Down Expand Up @@ -131,6 +137,7 @@ func (oc *OperatorController) addOperatorLocked(op *Operator) bool {
if old, ok := oc.operators[regionID]; ok {
log.Info("replace old operator", zap.Uint64("region-id", regionID), zap.Reflect("operator", old))
operatorCounter.WithLabelValues(old.Desc(), "replaced").Inc()
oc.opRecords.Put(old, pdpb.OperatorStatus_REPLACE)
oc.removeOperatorLocked(old)
}

Expand All @@ -154,6 +161,19 @@ func (oc *OperatorController) RemoveOperator(op *Operator) {
oc.removeOperatorLocked(op)
}

// GetOperatorStatus gets the operator and its status with the specify id.
func (oc *OperatorController) GetOperatorStatus(id uint64) *OperatorWithStatus {
oc.Lock()
defer oc.Unlock()
if op, ok := oc.operators[id]; ok {
return &OperatorWithStatus{
Op: op,
Status: pdpb.OperatorStatus_RUNNING,
}
}
return oc.opRecords.Get(id)
}

func (oc *OperatorController) removeOperatorLocked(op *Operator) {
regionID := op.RegionID()
delete(oc.operators, regionID)
Expand Down Expand Up @@ -401,9 +421,48 @@ func (s StoreInfluence) ResourceSize(kind core.ResourceKind) int64 {
}
}

// SetOperator is only used for test
// SetOperator is only used for test.
func (oc *OperatorController) SetOperator(op *Operator) {
oc.Lock()
defer oc.Unlock()
oc.operators[op.RegionID()] = op
}

// OperatorWithStatus records the operator and its status.
type OperatorWithStatus struct {
Op *Operator
Status pdpb.OperatorStatus
}

// OperatorRecords remains the operator and its status for a while.
type OperatorRecords struct {
ttl *cache.TTL
}

const operatorStatusRemainTime = 10 * time.Minute

// NewOperatorRecords returns a OperatorRecords.
func NewOperatorRecords() *OperatorRecords {
return &OperatorRecords{
ttl: cache.NewTTL(time.Minute, operatorStatusRemainTime),
}
}

// Get gets the operator and its status.
func (o *OperatorRecords) Get(id uint64) *OperatorWithStatus {
v, exist := o.ttl.Get(id)
if !exist {
return nil
}
return v.(*OperatorWithStatus)
}

// Put puts the operator and its status.
func (o *OperatorRecords) Put(op *Operator, status pdpb.OperatorStatus) {
id := op.regionID
record := &OperatorWithStatus{
Op: op,
Status: status,
}
o.ttl.Put(id, record)
}
41 changes: 41 additions & 0 deletions server/schedule/operator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/pd/server/core"
)

var _ = Suite(&testOperatorControllerSuite{})
Expand Down Expand Up @@ -51,3 +53,42 @@ func (t *testOperatorControllerSuite) TestGetOpInfluence(c *C) {
time.Sleep(1 * time.Second)
c.Assert(oc.GetOperator(2), NotNil)
}

type mockHeadbeatStream struct{}

func (m mockHeadbeatStream) SendMsg(region *core.RegionInfo, msg *pdpb.RegionHeartbeatResponse) {
return
}

func (t *testOperatorControllerSuite) TestOperatorStatus(c *C) {
opt := NewMockSchedulerOptions()
tc := NewMockCluster(opt)
oc := NewOperatorController(tc, nil)
oc.hbStreams = mockHeadbeatStream{}
tc.AddLeaderStore(1, 2)
tc.AddLeaderStore(2, 0)
tc.AddLeaderRegion(1, 1, 2)
tc.AddLeaderRegion(2, 1, 2)
steps := []OperatorStep{
RemovePeer{FromStore: 2},
AddPeer{ToStore: 2, PeerID: 4},
}
op1 := NewOperator("testOperator", 1, &metapb.RegionEpoch{}, OpRegion, steps...)
op2 := NewOperator("testOperator", 2, &metapb.RegionEpoch{}, OpRegion, steps...)
region1 := tc.GetRegion(1)
region2 := tc.GetRegion(2)
oc.SetOperator(op1)
oc.SetOperator(op2)
c.Assert(oc.GetOperatorStatus(1).Status, Equals, pdpb.OperatorStatus_RUNNING)
c.Assert(oc.GetOperatorStatus(2).Status, Equals, pdpb.OperatorStatus_RUNNING)
op1.createTime = time.Now().Add(-10 * time.Minute)
region2 = tc.ApplyOperatorStep(region2, op2)
tc.PutRegion(region2)
oc.Dispatch(region1)
oc.Dispatch(region2)
c.Assert(oc.GetOperatorStatus(1).Status, Equals, pdpb.OperatorStatus_TIMEOUT)
c.Assert(oc.GetOperatorStatus(2).Status, Equals, pdpb.OperatorStatus_RUNNING)
tc.ApplyOperator(op2)
oc.Dispatch(region2)
c.Assert(oc.GetOperatorStatus(2).Status, Equals, pdpb.OperatorStatus_SUCCESS)
}
20 changes: 10 additions & 10 deletions server/schedule/operator_kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var flagToName = map[OperatorKind]string{
OpLeader: "leader",
OpRegion: "region",
OpAdmin: "admin",
OpHotRegion: "hotRegion",
OpHotRegion: "hot-region",
OpAdjacent: "adjacent",
OpReplica: "replica",
OpBalance: "balance",
Expand All @@ -49,15 +49,15 @@ var flagToName = map[OperatorKind]string{
}

var nameToFlag = map[string]OperatorKind{
"leader": OpLeader,
"region": OpRegion,
"admin": OpAdmin,
"hotRegion": OpHotRegion,
"adjacent": OpAdjacent,
"replica": OpReplica,
"balance": OpBalance,
"merge": OpMerge,
"range": OpRange,
"leader": OpLeader,
"region": OpRegion,
"admin": OpAdmin,
"hot-region": OpHotRegion,
"adjacent": OpAdjacent,
"replica": OpReplica,
"balance": OpBalance,
"merge": OpMerge,
"range": OpRange,
}

func (k OperatorKind) String() string {
Expand Down

0 comments on commit 9ad0db5

Please sign in to comment.