Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add GetOperator service #1477

Merged
merged 9 commits into from
Apr 8, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9
github.com/pingcap/errors v0.10.1 // indirect
github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3
github.com/pingcap/kvproto v0.0.0-20190225084405-84f2c621d8e8
github.com/pingcap/kvproto v0.0.0-20190327032727-3d8cb3a30d5d
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v0.8.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ github.com/pingcap/errors v0.10.1 h1:fGVuPMtwNcxbzQ3aoRyyi6kxvXKMkEsceP81f3b8wsk
github.com/pingcap/errors v0.10.1/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3 h1:04yuCf5NMvLU8rB2m4Qs3rynH7EYpMno3lHkewIOdMo=
github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3/go.mod h1:DazNTg0PTldtpsQiT9I5tVJwV1onHMKBBgXzmJUlMns=
github.com/pingcap/kvproto v0.0.0-20190225084405-84f2c621d8e8 h1:ZoP49RWRjlmvXUWAySYZD1tV8BIVVEJ7xrbCg1B7/fw=
github.com/pingcap/kvproto v0.0.0-20190225084405-84f2c621d8e8/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190327032727-3d8cb3a30d5d h1:LJYJl+cBhkkTWD79n+n9Bp4agQ85SdF9YKY4zEtL9Kw=
github.com/pingcap/kvproto v0.0.0-20190327032727-3d8cb3a30d5d/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7 h1:kOHAMalwF69bJrtWrOdVaCSvZjLucrJhP4NQKIu6uM4=
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
32 changes: 32 additions & 0 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ func (s *Server) GetStore(ctx context.Context, request *pdpb.GetStoreRequest) (*
return &pdpb.GetStoreResponse{
Header: s.header(),
Store: store.GetMeta(),
Stats: store.GetStoreStats(),
}, nil
}

Expand Down Expand Up @@ -688,6 +689,37 @@ func (s *Server) UpdateGCSafePoint(ctx context.Context, request *pdpb.UpdateGCSa
}, nil
}

// GetOperator gets information about the operator belonging to the speicfy region.
func (s *Server) GetOperator(ctx context.Context, request *pdpb.GetOperatorRequest) (*pdpb.GetOperatorResponse, error) {
if err := s.validateRequest(request.GetHeader()); err != nil {
return nil, err
}

cluster := s.GetRaftCluster()
if cluster == nil {
return &pdpb.GetOperatorResponse{Header: s.notBootstrappedHeader()}, nil
}

opController := cluster.coordinator.opController
requestID := request.GetRegionId()
r := opController.GetOperatorStatus(requestID)
if r == nil {
header := s.errorHeader(&pdpb.Error{
Type: pdpb.ErrorType_REGION_NOT_FOUND,
Message: "Not Found",
})
return &pdpb.GetOperatorResponse{Header: header}, nil
}

return &pdpb.GetOperatorResponse{
Header: s.header(),
RegionId: requestID,
Desc: []byte(r.Op.Desc()),
Kind: []byte(r.Op.Kind().String()),
Status: r.Status,
}, nil
}

// validateRequest checks if Server is leader and clusterID is matched.
// TODO: Call it in gRPC intercepter.
func (s *Server) validateRequest(header *pdpb.RequestHeader) error {
Expand Down
96 changes: 51 additions & 45 deletions server/schedule/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,55 +376,61 @@ func (mc *MockCluster) newMockRegionInfo(regionID uint64, leaderID uint64, follo
return core.NewRegionInfo(region, leader)
}

// ApplyOperator mocks apply oeprator.
// ApplyOperatorStep mocks apply operator step.
func (mc *MockCluster) ApplyOperatorStep(region *core.RegionInfo, op *Operator) *core.RegionInfo {
if step := op.Check(region); step != nil {
switch s := step.(type) {
case TransferLeader:
region = region.Clone(core.WithLeader(region.GetStorePeer(s.ToStore)))
case AddPeer:
if region.GetStorePeer(s.ToStore) != nil {
panic("Add peer that exists")
}
peer := &metapb.Peer{
Id: s.PeerID,
StoreId: s.ToStore,
}
region = region.Clone(core.WithAddPeer(peer))
case RemovePeer:
if region.GetStorePeer(s.FromStore) == nil {
panic("Remove peer that doesn't exist")
}
if region.GetLeader().GetStoreId() == s.FromStore {
panic("Cannot remove the leader peer")
}
region = region.Clone(core.WithRemoveStorePeer(s.FromStore))
case AddLearner:
if region.GetStorePeer(s.ToStore) != nil {
panic("Add learner that exists")
}
peer := &metapb.Peer{
Id: s.PeerID,
StoreId: s.ToStore,
IsLearner: true,
}
region = region.Clone(core.WithAddPeer(peer))
case PromoteLearner:
if region.GetStoreLearner(s.ToStore) == nil {
panic("promote peer that doesn't exist")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be better to keep the first letter consistent with other panic messages.

}
peer := &metapb.Peer{
Id: s.PeerID,
StoreId: s.ToStore,
}
region = region.Clone(core.WithRemoveStorePeer(s.ToStore), core.WithAddPeer(peer))
default:
panic("Unknown operator step")
}
}
return region
}

// ApplyOperator mocks apply operator.
func (mc *MockCluster) ApplyOperator(op *Operator) {
origin := mc.GetRegion(op.RegionID())
region := origin
for !op.IsFinish() {
if step := op.Check(region); step != nil {
switch s := step.(type) {
case TransferLeader:
region = region.Clone(core.WithLeader(region.GetStorePeer(s.ToStore)))
case AddPeer:
if region.GetStorePeer(s.ToStore) != nil {
panic("Add peer that exists")
}
peer := &metapb.Peer{
Id: s.PeerID,
StoreId: s.ToStore,
}
region = region.Clone(core.WithAddPeer(peer))
case RemovePeer:
if region.GetStorePeer(s.FromStore) == nil {
panic("Remove peer that doesn't exist")
}
if region.GetLeader().GetStoreId() == s.FromStore {
panic("Cannot remove the leader peer")
}
region = region.Clone(core.WithRemoveStorePeer(s.FromStore))
case AddLearner:
if region.GetStorePeer(s.ToStore) != nil {
panic("Add learner that exists")
}
peer := &metapb.Peer{
Id: s.PeerID,
StoreId: s.ToStore,
IsLearner: true,
}
region = region.Clone(core.WithAddPeer(peer))
case PromoteLearner:
if region.GetStoreLearner(s.ToStore) == nil {
panic("promote peer that doesn't exist")
}
peer := &metapb.Peer{
Id: s.PeerID,
StoreId: s.ToStore,
}
region = region.Clone(core.WithRemoveStorePeer(s.ToStore), core.WithAddPeer(peer))
default:
panic("Unknown operator step")
}
}
region = mc.ApplyOperatorStep(region, op)
}
mc.PutRegion(region)
for id := range region.GetStoreIds() {
Expand Down
59 changes: 58 additions & 1 deletion server/schedule/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
log "github.com/pingcap/log"
"github.com/pingcap/pd/server/cache"
"github.com/pingcap/pd/server/core"
"go.uber.org/zap"
)
Expand All @@ -41,6 +42,7 @@ type OperatorController struct {
hbStreams HeartbeatStreams
histories *list.List
counts map[OperatorKind]uint64
opRecords *OperatorRecords
}

// NewOperatorController creates a OperatorController.
Expand All @@ -51,6 +53,7 @@ func NewOperatorController(cluster Cluster, hbStreams HeartbeatStreams) *Operato
hbStreams: hbStreams,
histories: list.New(),
counts: make(map[OperatorKind]uint64),
opRecords: NewOperatorRecords(),
}
}

Expand All @@ -69,10 +72,12 @@ func (oc *OperatorController) Dispatch(region *core.RegionInfo) {
operatorCounter.WithLabelValues(op.Desc(), "finish").Inc()
operatorDuration.WithLabelValues(op.Desc()).Observe(op.ElapsedTime().Seconds())
oc.pushHistory(op)
oc.opRecords.Put(op, pdpb.OperatorStatus_SUCCESS)
oc.RemoveOperator(op)
} else if timeout {
log.Info("operator timeout", zap.Uint64("region-id", region.GetID()), zap.Reflect("operator", op))
oc.RemoveOperator(op)
oc.opRecords.Put(op, pdpb.OperatorStatus_TIMEOUT)
}
}
}
Expand All @@ -85,6 +90,7 @@ func (oc *OperatorController) AddOperator(ops ...*Operator) bool {
for _, op := range ops {
if !oc.checkAddOperator(op) {
operatorCounter.WithLabelValues(op.Desc(), "canceled").Inc()
oc.opRecords.Put(op, pdpb.OperatorStatus_CANCEL)
return false
}
}
Expand Down Expand Up @@ -131,6 +137,7 @@ func (oc *OperatorController) addOperatorLocked(op *Operator) bool {
if old, ok := oc.operators[regionID]; ok {
log.Info("replace old operator", zap.Uint64("region-id", regionID), zap.Reflect("operator", old))
operatorCounter.WithLabelValues(old.Desc(), "replaced").Inc()
oc.opRecords.Put(old, pdpb.OperatorStatus_REPLACE)
oc.removeOperatorLocked(old)
}

Expand All @@ -154,6 +161,19 @@ func (oc *OperatorController) RemoveOperator(op *Operator) {
oc.removeOperatorLocked(op)
}

// GetOperatorStatus gets the operator and its status with the specify id.
func (oc *OperatorController) GetOperatorStatus(id uint64) *OperatorWithStatus {
oc.Lock()
defer oc.Unlock()
if op, ok := oc.operators[id]; ok {
return &OperatorWithStatus{
Op: op,
Status: pdpb.OperatorStatus_RUNNING,
}
}
return oc.opRecords.Get(id)
}

func (oc *OperatorController) removeOperatorLocked(op *Operator) {
regionID := op.RegionID()
delete(oc.operators, regionID)
Expand Down Expand Up @@ -401,9 +421,46 @@ func (s StoreInfluence) ResourceSize(kind core.ResourceKind) int64 {
}
}

// SetOperator is only used for test
// SetOperator is only used for test.
func (oc *OperatorController) SetOperator(op *Operator) {
oc.Lock()
defer oc.Unlock()
oc.operators[op.RegionID()] = op
}

// OperatorWithStatus records the operator and its status.
type OperatorWithStatus struct {
Op *Operator
Status pdpb.OperatorStatus
}

// OperatorRecords remains the operator and its status for a while.
type OperatorRecords struct {
ttl *cache.TTL
}

// NewOperatorRecords returns a OperatorRecords.
func NewOperatorRecords() *OperatorRecords {
return &OperatorRecords{
ttl: cache.NewTTL(time.Minute, 10*time.Minute),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make this parameter configurable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think no need, just keep for a while.

}
}

// Get gets the operator and its status.
func (o *OperatorRecords) Get(id uint64) *OperatorWithStatus {
v, exist := o.ttl.Get(id)
if !exist {
return nil
}
return v.(*OperatorWithStatus)
}

// Put puts the operator and its status.
func (o *OperatorRecords) Put(op *Operator, status pdpb.OperatorStatus) {
id := op.regionID
record := &OperatorWithStatus{
Op: op,
Status: status,
}
o.ttl.Put(id, record)
}
41 changes: 41 additions & 0 deletions server/schedule/operator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/pd/server/core"
)

var _ = Suite(&testOperatorControllerSuite{})
Expand Down Expand Up @@ -51,3 +53,42 @@ func (t *testOperatorControllerSuite) TestGetOpInfluence(c *C) {
time.Sleep(1 * time.Second)
c.Assert(oc.GetOperator(2), NotNil)
}

type mockHeadbeatStream struct{}

func (m mockHeadbeatStream) SendMsg(region *core.RegionInfo, msg *pdpb.RegionHeartbeatResponse) {
return
}

func (t *testOperatorControllerSuite) TestOperatorStatus(c *C) {
opt := NewMockSchedulerOptions()
tc := NewMockCluster(opt)
oc := NewOperatorController(tc, nil)
oc.hbStreams = mockHeadbeatStream{}
tc.AddLeaderStore(1, 2)
tc.AddLeaderStore(2, 0)
tc.AddLeaderRegion(1, 1, 2)
tc.AddLeaderRegion(2, 1, 2)
steps := []OperatorStep{
RemovePeer{FromStore: 2},
AddPeer{ToStore: 2, PeerID: 4},
}
op1 := NewOperator("testOperator", 1, &metapb.RegionEpoch{}, OpRegion, steps...)
op2 := NewOperator("testOperator", 2, &metapb.RegionEpoch{}, OpRegion, steps...)
region1 := tc.GetRegion(1)
region2 := tc.GetRegion(2)
oc.SetOperator(op1)
oc.SetOperator(op2)
c.Assert(oc.GetOperatorStatus(1).Status, Equals, pdpb.OperatorStatus_RUNNING)
c.Assert(oc.GetOperatorStatus(2).Status, Equals, pdpb.OperatorStatus_RUNNING)
op1.createTime = time.Now().Add(-10 * time.Minute)
region2 = tc.ApplyOperatorStep(region2, op2)
tc.PutRegion(region2)
oc.Dispatch(region1)
oc.Dispatch(region2)
c.Assert(oc.GetOperatorStatus(1).Status, Equals, pdpb.OperatorStatus_TIMEOUT)
c.Assert(oc.GetOperatorStatus(2).Status, Equals, pdpb.OperatorStatus_RUNNING)
tc.ApplyOperator(op2)
oc.Dispatch(region2)
c.Assert(oc.GetOperatorStatus(2).Status, Equals, pdpb.OperatorStatus_SUCCESS)
}
20 changes: 10 additions & 10 deletions server/schedule/operator_kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var flagToName = map[OperatorKind]string{
OpLeader: "leader",
OpRegion: "region",
OpAdmin: "admin",
OpHotRegion: "hotRegion",
OpHotRegion: "hot-region",
OpAdjacent: "adjacent",
OpReplica: "replica",
OpBalance: "balance",
Expand All @@ -49,15 +49,15 @@ var flagToName = map[OperatorKind]string{
}

var nameToFlag = map[string]OperatorKind{
"leader": OpLeader,
"region": OpRegion,
"admin": OpAdmin,
"hotRegion": OpHotRegion,
"adjacent": OpAdjacent,
"replica": OpReplica,
"balance": OpBalance,
"merge": OpMerge,
"range": OpRange,
"leader": OpLeader,
"region": OpRegion,
"admin": OpAdmin,
"hot-region": OpHotRegion,
"adjacent": OpAdjacent,
"replica": OpReplica,
"balance": OpBalance,
"merge": OpMerge,
"range": OpRange,
}

func (k OperatorKind) String() string {
Expand Down