From 5598c003af5945a1f694bdbf5f069b5ea1de84db Mon Sep 17 00:00:00 2001 From: disksing Date: Tue, 26 Dec 2017 15:16:38 +0800 Subject: [PATCH] *: add trend api. (#881) --- server/api/history.go | 78 --------------- server/api/history_test.go | 107 -------------------- server/api/router.go | 7 +- server/api/trend.go | 183 +++++++++++++++++++++++++++++++++++ server/api/trend_test.go | 101 +++++++++++++++++++ server/cluster.go | 1 + server/coordinator.go | 60 ++++++------ server/core/resource_kind.go | 11 +++ server/handler.go | 34 ++++--- server/schedule/operator.go | 40 ++++++++ 10 files changed, 393 insertions(+), 229 deletions(-) delete mode 100644 server/api/history.go delete mode 100644 server/api/history_test.go create mode 100644 server/api/trend.go create mode 100644 server/api/trend_test.go diff --git a/server/api/history.go b/server/api/history.go deleted file mode 100644 index 48c1f77c3ea..00000000000 --- a/server/api/history.go +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright 2017 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package api - -import ( - "net/http" - "strconv" - - "github.com/gorilla/mux" - "github.com/juju/errors" - "github.com/pingcap/pd/server" - "github.com/pingcap/pd/server/schedule" - "github.com/unrolled/render" -) - -var errUnknownOperatorKind = errors.New("Unknown operator kind") - -type historyHandler struct { - *server.Handler - r *render.Render -} - -func newHistoryHandler(handler *server.Handler, r *render.Render) *historyHandler { - return &historyHandler{ - Handler: handler, - r: r, - } -} - -func (h *historyHandler) GetOperators(w http.ResponseWriter, r *http.Request) { - ops, err := h.GetHistoryOperators() - if err != nil { - h.r.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - - h.r.JSON(w, http.StatusOK, ops) -} - -func (h *historyHandler) GetOperatorsOfKind(w http.ResponseWriter, r *http.Request) { - k := mux.Vars(r)["kind"] - l := mux.Vars(r)["limit"] - limit, err := strconv.Atoi(l) - if err != nil { - h.r.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - if limit <= 0 { - h.r.JSON(w, http.StatusOK, nil) - return - } - kind, err := schedule.ParseOperatorKind(k) - if err != nil { - h.r.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - ops, err := h.GetHistoryOperatorsOfKind(kind) - if err != nil { - h.r.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - if limit > len(ops) { - limit = len(ops) - } - - h.r.JSON(w, http.StatusOK, ops[:limit]) -} diff --git a/server/api/history_test.go b/server/api/history_test.go deleted file mode 100644 index cdf9d92d72a..00000000000 --- a/server/api/history_test.go +++ /dev/null @@ -1,107 +0,0 @@ -// Copyright 2017 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package api - -import ( - "encoding/json" - "fmt" - "net/http" - - . "github.com/pingcap/check" - "github.com/pingcap/pd/server" -) - -var _ = Suite(&testHistorySuite{}) - -type testHistorySuite struct { - svr *server.Server - cleanup cleanUpFunc - urlPrefix string - cli *http.Client -} - -func (s *testHistorySuite) SetUpSuite(c *C) { - s.svr, s.cleanup = mustNewServer(c) - mustWaitLeader(c, []*server.Server{s.svr}) - - addr := s.svr.GetAddr() - s.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix) - - mustBootstrapCluster(c, s.svr) - s.cli = newHTTPClient() - - r := newTestRegionInfo(2, 1, []byte("a"), []byte("b")) - mustRegionHeartbeat(c, s.svr, r) - - r = newTestRegionInfo(3, 1, []byte("b"), []byte("f")) - mustRegionHeartbeat(c, s.svr, r) -} - -func (s *testHistorySuite) TearDownSuite(c *C) { - s.cleanup() -} - -func addTransferLeaderOperator(cli *http.Client, urlPrefix string, regionID uint64, storeID uint64) error { - req := map[string]interface{}{ - "name": "transfer-leader", - "region_id": regionID, - "to_store_id": storeID, - } - data, err := json.Marshal(req) - if err != nil { - return err - } - url := fmt.Sprintf("%s/operators", urlPrefix) - return postJSON(cli, url, data) -} - -func (s *testHistorySuite) TestHistroyOperators(c *C) { - err := addTransferLeaderOperator(s.cli, s.urlPrefix, 2, 1) - c.Assert(err, IsNil) - err = addTransferLeaderOperator(s.cli, s.urlPrefix, 3, 1) - c.Assert(err, IsNil) - - // gets all history - url := fmt.Sprintf("%s/history", s.urlPrefix) - resp, err := s.cli.Get(url) - c.Assert(err, IsNil) - res := []interface{}{} - err = readJSON(resp.Body, &res) - c.Assert(err, IsNil) - c.Assert(len(res), Equals, 2) - - // gets history by kind and limit - tbl := []struct { - kind string - limit int - result int - }{ - {"admin", 0, 0}, - {"admin", -1, 0}, - {"admin", 1, 1}, - {"admin", 2, 2}, - {"admin", 3, 2}, - } - - for _, t := range tbl { - url = fmt.Sprintf("%s/history/%s/%d", s.urlPrefix, t.kind, t.limit) - resp, err = s.cli.Get(url) - c.Assert(resp.StatusCode, Equals, 200) - c.Assert(err, IsNil) - res = []interface{}{} - err = readJSON(resp.Body, &res) - c.Assert(err, IsNil) - c.Assert(len(res), Equals, t.result) - } -} diff --git a/server/api/router.go b/server/api/router.go index a8a61385faa..e6414b2cf4c 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -30,10 +30,6 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { router := mux.NewRouter().PathPrefix(prefix).Subrouter() handler := svr.GetHandler() - historyHanlder := newHistoryHandler(handler, rd) - router.HandleFunc("/api/v1/history", historyHanlder.GetOperators).Methods("GET") - router.HandleFunc("/api/v1/history/{kind}/{limit}", historyHanlder.GetOperatorsOfKind).Methods("GET") - operatorHandler := newOperatorHandler(handler, rd) router.HandleFunc("/api/v1/operators", operatorHandler.List).Methods("GET") router.HandleFunc("/api/v1/operators", operatorHandler.Post).Methods("POST") @@ -101,6 +97,9 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { statsHandler := newStatsHandler(svr, rd) router.HandleFunc("/api/v1/stats/region", statsHandler.Region).Methods("GET") + trendHandler := newTrendHandler(svr, rd) + router.HandleFunc("/api/v1/trend", trendHandler.Handle).Methods("GET") + router.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {}).Methods("GET") return router } diff --git a/server/api/trend.go b/server/api/trend.go new file mode 100644 index 00000000000..71cd1883767 --- /dev/null +++ b/server/api/trend.go @@ -0,0 +1,183 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "net/http" + "strconv" + "time" + + "github.com/juju/errors" + "github.com/pingcap/pd/pkg/typeutil" + "github.com/pingcap/pd/server" + "github.com/pingcap/pd/server/core" + "github.com/unrolled/render" +) + +// Trend describes the cluster's schedule trend. +type Trend struct { + Stores []trendStore `json:"stores"` + History *trendHistory `json:"history"` +} + +type trendStore struct { + ID uint64 `json:"id"` + Address string `json:"address"` + StateName string `json:"state_name"` + Capacity uint64 `json:"capacity"` + Available uint64 `json:"available"` + RegionCount int `json:"region_count"` + LeaderCount int `json:"leader_count"` + StartTS *time.Time `json:"start_ts,omitempty"` + LastHeartbeatTS *time.Time `json:"last_heartbeat_ts,omitempty"` + Uptime *typeutil.Duration `json:"uptime,omitempty"` + + HotWriteFlow uint64 `json:"hot_write_flow"` + HotWriteRegionFlows []uint64 `json:"hot_write_region_flows"` + HotReadFlow uint64 `json:"hot_read_flow"` + HotReadRegionFlows []uint64 `json:"hot_read_region_flows"` +} + +type trendHistory struct { + StartTime int64 `json:"start"` + EndTime int64 `json:"end"` + Entries []trendHistoryEntry `json:"entries"` +} + +type trendHistoryEntry struct { + From uint64 `json:"from"` + To uint64 `json:"to"` + Kind string `json:"kind"` + Count int `json:"count"` +} + +type trendHandler struct { + *server.Handler + svr *server.Server + rd *render.Render +} + +func newTrendHandler(s *server.Server, rd *render.Render) *trendHandler { + return &trendHandler{ + Handler: s.GetHandler(), + svr: s, + rd: rd, + } +} + +func (h *trendHandler) Handle(w http.ResponseWriter, r *http.Request) { + var from time.Time + if fromStr := r.URL.Query()["from"]; len(fromStr) > 0 { + fromInt, err := strconv.ParseInt(fromStr[0], 10, 64) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } + from = time.Unix(fromInt, 0) + } + + stores, err := h.getTrendStores() + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + + history, err := h.getTrendHistory(from) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + + trend := Trend{ + Stores: stores, + History: history, + } + h.rd.JSON(w, http.StatusOK, trend) +} + +func (h *trendHandler) getTrendStores() ([]trendStore, error) { + maxStoreDownTime := h.svr.GetScheduleConfig().MaxStoreDownTime.Duration + + var readStats, writeStats core.StoreHotRegionsStat + if hotRead := h.GetHotReadRegions(); hotRead != nil { + readStats = hotRead.AsLeader + } + if hotWrite := h.GetHotWriteRegions(); hotWrite != nil { + writeStats = hotWrite.AsPeer + } + stores, err := h.GetStores() + if err != nil { + return nil, errors.Trace(err) + } + + trendStores := make([]trendStore, 0, len(stores)) + for _, store := range stores { + info := newStoreInfo(store, maxStoreDownTime) + s := trendStore{ + ID: info.Store.GetId(), + Address: info.Store.GetAddress(), + StateName: info.Store.StateName, + Capacity: uint64(info.Status.Capacity), + Available: uint64(info.Status.Available), + RegionCount: info.Status.RegionCount, + LeaderCount: info.Status.LeaderCount, + StartTS: info.Status.StartTS, + LastHeartbeatTS: info.Status.LastHeartbeatTS, + Uptime: info.Status.Uptime, + } + s.HotReadFlow, s.HotReadRegionFlows = h.getStoreFlow(readStats, store.GetId()) + s.HotWriteFlow, s.HotWriteRegionFlows = h.getStoreFlow(writeStats, store.GetId()) + trendStores = append(trendStores, s) + } + return trendStores, nil +} + +func (h *trendHandler) getStoreFlow(stats core.StoreHotRegionsStat, storeID uint64) (storeFlow uint64, regionFlows []uint64) { + if stats == nil { + return + } + if stat, ok := stats[storeID]; ok { + storeFlow = stat.TotalFlowBytes + for _, flow := range stat.RegionsStat { + regionFlows = append(regionFlows, flow.FlowBytes) + } + } + return +} + +func (h *trendHandler) getTrendHistory(start time.Time) (*trendHistory, error) { + operatorHistory, err := h.GetHistory(start) + if err != nil { + return nil, errors.Trace(err) + } + // Use a tmp map to merge same histories together. + historyMap := make(map[trendHistoryEntry]int) + for _, entry := range operatorHistory { + historyMap[trendHistoryEntry{ + From: entry.From, + To: entry.To, + Kind: entry.Kind.String(), + }]++ + } + history := make([]trendHistoryEntry, 0, len(historyMap)) + for entry, count := range historyMap { + entry.Count = count + history = append(history, entry) + } + return &trendHistory{ + StartTime: start.Unix(), + EndTime: time.Now().Unix(), + Entries: history, + }, nil +} diff --git a/server/api/trend_test.go b/server/api/trend_test.go new file mode 100644 index 00000000000..2b8d175d519 --- /dev/null +++ b/server/api/trend_test.go @@ -0,0 +1,101 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "fmt" + + . "github.com/pingcap/check" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/pd/server" + "github.com/pingcap/pd/server/core" +) + +var _ = Suite(&testTrendSuite{}) + +type testTrendSuite struct{} + +func (s *testTrendSuite) TestTend(c *C) { + svr, cleanup := mustNewServer(c) + defer cleanup() + mustWaitLeader(c, []*server.Server{svr}) + + mustBootstrapCluster(c, svr) + for i := 1; i <= 3; i++ { + mustPutStore(c, svr, uint64(i), metapb.StoreState_Up, nil) + } + + // Create 3 regions, all peers on store1 and store2, and the leaders are all on store1. + mustRegionHeartbeat(c, svr, s.newRegionInfo(4, "", "a", 2, 2, []uint64{1, 2}, 1)) + mustRegionHeartbeat(c, svr, s.newRegionInfo(5, "a", "b", 2, 2, []uint64{1, 2}, 1)) + mustRegionHeartbeat(c, svr, s.newRegionInfo(6, "b", "", 2, 2, []uint64{1, 2}, 1)) + + // Create 3 operators that transfers leader, moves follower, moves leader. + svr.GetHandler().AddTransferLeaderOperator(4, 2) + svr.GetHandler().AddTransferPeerOperator(5, 2, 3) + svr.GetHandler().AddTransferPeerOperator(6, 1, 3) + + // Complete the operators. + mustRegionHeartbeat(c, svr, s.newRegionInfo(4, "", "a", 2, 2, []uint64{1, 2}, 2)) + mustRegionHeartbeat(c, svr, s.newRegionInfo(5, "a", "b", 3, 2, []uint64{1, 3}, 1)) + mustRegionHeartbeat(c, svr, s.newRegionInfo(6, "b", "", 3, 2, []uint64{2, 3}, 2)) + + var trend Trend + err := readJSONWithURL(fmt.Sprintf("%s%s/api/v1/trend", svr.GetAddr(), apiPrefix), &trend) + c.Assert(err, IsNil) + + // Check store states. + expectLeaderCount := map[uint64]int{1: 1, 2: 2, 3: 0} + expectRegionCount := map[uint64]int{1: 2, 2: 2, 3: 2} + c.Assert(len(trend.Stores), Equals, 3) + for _, store := range trend.Stores { + c.Assert(store.LeaderCount, Equals, expectLeaderCount[store.ID]) + c.Assert(store.RegionCount, Equals, expectRegionCount[store.ID]) + } + + // Check history. + expectHistory := map[trendHistoryEntry]int{ + {From: 1, To: 2, Kind: "leader"}: 2, + {From: 1, To: 3, Kind: "region"}: 1, + {From: 2, To: 3, Kind: "region"}: 1, + } + c.Assert(len(trend.History.Entries), Equals, 3) + for _, history := range trend.History.Entries { + c.Assert(history.Count, Equals, expectHistory[trendHistoryEntry{From: history.From, To: history.To, Kind: history.Kind}]) + } +} + +func (s *testTrendSuite) newRegionInfo(id uint64, startKey, endKey string, confVer, ver uint64, stores []uint64, leaderStore uint64) *core.RegionInfo { + var ( + peers []*metapb.Peer + leader *metapb.Peer + ) + for _, id := range stores { + p := &metapb.Peer{Id: id, StoreId: id} + if id == leaderStore { + leader = p + } + peers = append(peers, p) + } + return &core.RegionInfo{ + Region: &metapb.Region{ + Id: id, + StartKey: []byte(startKey), + EndKey: []byte(endKey), + RegionEpoch: &metapb.RegionEpoch{ConfVer: confVer, Version: ver}, + Peers: peers, + }, + Leader: leader, + } +} diff --git a/server/cluster.go b/server/cluster.go index 090572dc2d9..7d07943d998 100644 --- a/server/cluster.go +++ b/server/cluster.go @@ -464,6 +464,7 @@ func (c *RaftCluster) runBackgroundJobs(interval time.Duration) { case <-ticker.C: c.checkStores() c.collectMetrics() + c.coordinator.pruneHistory() } } } diff --git a/server/coordinator.go b/server/coordinator.go index ae48083f126..a90b7bdfd42 100644 --- a/server/coordinator.go +++ b/server/coordinator.go @@ -14,6 +14,7 @@ package server import ( + "container/list" "context" "fmt" "sync" @@ -23,7 +24,6 @@ import ( "github.com/pingcap/kvproto/pkg/eraftpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" - "github.com/pingcap/pd/server/cache" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/namespace" "github.com/pingcap/pd/server/schedule" @@ -33,8 +33,7 @@ import ( const ( runSchedulerCheckInterval = 3 * time.Second collectFactor = 0.8 - historiesCacheSize = 1000 - eventsCacheSize = 1000 + historyKeepTime = 5 * time.Minute maxScheduleRetries = 10 regionheartbeatSendChanCap = 1024 @@ -61,7 +60,7 @@ type coordinator struct { operators map[uint64]*schedule.Operator schedulers map[string]*scheduleController classifier namespace.Classifier - histories cache.Cache + histories *list.List hbStreams *heartbeatStreams } @@ -78,7 +77,7 @@ func newCoordinator(cluster *clusterInfo, hbStreams *heartbeatStreams, classifie operators: make(map[uint64]*schedule.Operator), schedulers: make(map[string]*scheduleController), classifier: classifier, - histories: cache.NewDefaultCache(historiesCacheSize), + histories: list.New(), hbStreams: hbStreams, } } @@ -95,6 +94,7 @@ func (c *coordinator) dispatch(region *core.RegionInfo) { if op.IsFinish() { log.Infof("[region %v] operator finish: %s", region.GetId(), op) operatorCounter.WithLabelValues(op.Desc(), "finish").Inc() + c.pushHistory(op) c.removeOperator(op) } else if timeout { log.Infof("[region %v] operator timeout: %s", region.GetId(), op) @@ -366,7 +366,6 @@ func (c *coordinator) addOperator(op *schedule.Operator) bool { c.removeOperatorLocked(old) } - c.histories.Put(regionID, op) c.operators[regionID] = op c.limiter.UpdateCounts(c.operators) @@ -384,6 +383,25 @@ func isHigherPriorityOperator(new, old *schedule.Operator) bool { return new.GetPriorityLevel() < old.GetPriorityLevel() } +func (c *coordinator) pushHistory(op *schedule.Operator) { + c.Lock() + defer c.Unlock() + for _, h := range op.History() { + c.histories.PushFront(h) + } +} + +func (c *coordinator) pruneHistory() { + c.Lock() + defer c.Unlock() + p := c.histories.Back() + for p != nil && time.Since(p.Value.(schedule.OperatorHistory).FinishTime) > historyKeepTime { + prev := p.Prev() + c.histories.Remove(p) + p = prev + } +} + func (c *coordinator) removeOperator(op *schedule.Operator) { c.Lock() defer c.Unlock() @@ -394,7 +412,6 @@ func (c *coordinator) removeOperatorLocked(op *schedule.Operator) { regionID := op.RegionID() delete(c.operators, regionID) c.limiter.UpdateCounts(c.operators) - c.histories.Put(regionID, op) operatorCounter.WithLabelValues(op.Desc(), "remove").Inc() } @@ -416,31 +433,18 @@ func (c *coordinator) getOperators() []*schedule.Operator { return operators } -func (c *coordinator) getHistories() []*schedule.Operator { - c.RLock() - defer c.RUnlock() - - var operators []*schedule.Operator - for _, elem := range c.histories.Elems() { - operators = append(operators, elem.Value.(*schedule.Operator)) - } - - return operators -} - -func (c *coordinator) getHistoriesOfKind(mask schedule.OperatorKind) []*schedule.Operator { +func (c *coordinator) getHistory(start time.Time) []schedule.OperatorHistory { c.RLock() defer c.RUnlock() - - var operators []*schedule.Operator - for _, elem := range c.histories.Elems() { - op := elem.Value.(*schedule.Operator) - if op.Kind()&mask != 0 { - operators = append(operators, op) + histories := make([]schedule.OperatorHistory, 0, c.histories.Len()) + for p := c.histories.Front(); p != nil; p = p.Next() { + history := p.Value.(schedule.OperatorHistory) + if history.FinishTime.Before(start) { + break } + histories = append(histories, history) } - - return operators + return histories } func (c *coordinator) sendScheduleCommand(region *core.RegionInfo, step schedule.OperatorStep) { diff --git a/server/core/resource_kind.go b/server/core/resource_kind.go index 5ba14bd5b1e..531a24323d4 100644 --- a/server/core/resource_kind.go +++ b/server/core/resource_kind.go @@ -32,3 +32,14 @@ const ( // RegionKind indicates the region kind resource RegionKind ) + +func (k ResourceKind) String() string { + switch k { + case LeaderKind: + return "leader" + case RegionKind: + return "region" + default: + return "unknown" + } +} diff --git a/server/handler.go b/server/handler.go index 27dc4ed4da3..b5ab0effa21 100644 --- a/server/handler.go +++ b/server/handler.go @@ -15,6 +15,7 @@ package server import ( "strconv" + "time" "github.com/juju/errors" "github.com/pingcap/pd/server/core" @@ -54,6 +55,24 @@ func (h *Handler) GetSchedulers() ([]string, error) { return c.getSchedulers(), nil } +// GetStores returns all stores in the cluster. +func (h *Handler) GetStores() ([]*core.StoreInfo, error) { + cluster := h.s.GetRaftCluster() + if cluster == nil { + return nil, errors.Trace(errNotBootstrapped) + } + storeMetas := cluster.GetStores() + stores := make([]*core.StoreInfo, 0, len(storeMetas)) + for _, s := range storeMetas { + store, err := cluster.GetStore(s.GetId()) + if err != nil { + return nil, errors.Trace(err) + } + stores = append(stores, store) + } + return stores, nil +} + // GetHotWriteRegions gets all hot write regions status func (h *Handler) GetHotWriteRegions() *core.StoreHotRegionInfos { c, err := h.getCoordinator() @@ -225,22 +244,13 @@ func (h *Handler) GetOperatorsOfKind(mask schedule.OperatorKind) ([]*schedule.Op return results, nil } -// GetHistoryOperators returns history operators -func (h *Handler) GetHistoryOperators() ([]*schedule.Operator, error) { - c, err := h.getCoordinator() - if err != nil { - return nil, errors.Trace(err) - } - return c.getHistories(), nil -} - -// GetHistoryOperatorsOfKind returns history operators by Kind -func (h *Handler) GetHistoryOperatorsOfKind(mask schedule.OperatorKind) ([]*schedule.Operator, error) { +// GetHistory returns finished operators' history since start. +func (h *Handler) GetHistory(start time.Time) ([]schedule.OperatorHistory, error) { c, err := h.getCoordinator() if err != nil { return nil, errors.Trace(err) } - return c.getHistoriesOfKind(mask), nil + return c.getHistory(start), nil } // AddTransferLeaderOperator adds an operator to transfer leader to the store. diff --git a/server/schedule/operator.go b/server/schedule/operator.go index b53b59d33e7..e8ff8f88015 100644 --- a/server/schedule/operator.go +++ b/server/schedule/operator.go @@ -221,6 +221,46 @@ func (o *Operator) Influence(opInfluence OpInfluence, region *core.RegionInfo) { } } +// OperatorHistory is used to log and visualize completed operators. +type OperatorHistory struct { + FinishTime time.Time + From, To uint64 + Kind core.ResourceKind +} + +// History transfers the operator's steps to operator histories. +func (o *Operator) History() []OperatorHistory { + now := time.Now() + var histories []OperatorHistory + var addPeerStores, removePeerStores []uint64 + for _, step := range o.steps { + switch s := step.(type) { + case TransferLeader: + histories = append(histories, OperatorHistory{ + FinishTime: now, + From: s.FromStore, + To: s.ToStore, + Kind: core.LeaderKind, + }) + case AddPeer: + addPeerStores = append(addPeerStores, s.ToStore) + case RemovePeer: + removePeerStores = append(removePeerStores, s.FromStore) + } + } + for i := range addPeerStores { + if i < len(removePeerStores) { + histories = append(histories, OperatorHistory{ + FinishTime: now, + From: removePeerStores[i], + To: addPeerStores[i], + Kind: core.RegionKind, + }) + } + } + return histories +} + // CreateRemovePeerOperator creates an Operator that removes a peer from region. // It prevents removing leader by tranfer its leadership first. func CreateRemovePeerOperator(desc string, kind OperatorKind, region *core.RegionInfo, storeID uint64) *Operator {