Skip to content

Commit

Permalink
*: add trend api. (#881)
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing authored Dec 26, 2017
1 parent 7ebda11 commit 5598c00
Show file tree
Hide file tree
Showing 10 changed files with 393 additions and 229 deletions.
78 changes: 0 additions & 78 deletions server/api/history.go

This file was deleted.

107 changes: 0 additions & 107 deletions server/api/history_test.go

This file was deleted.

7 changes: 3 additions & 4 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
router := mux.NewRouter().PathPrefix(prefix).Subrouter()
handler := svr.GetHandler()

historyHanlder := newHistoryHandler(handler, rd)
router.HandleFunc("/api/v1/history", historyHanlder.GetOperators).Methods("GET")
router.HandleFunc("/api/v1/history/{kind}/{limit}", historyHanlder.GetOperatorsOfKind).Methods("GET")

operatorHandler := newOperatorHandler(handler, rd)
router.HandleFunc("/api/v1/operators", operatorHandler.List).Methods("GET")
router.HandleFunc("/api/v1/operators", operatorHandler.Post).Methods("POST")
Expand Down Expand Up @@ -101,6 +97,9 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
statsHandler := newStatsHandler(svr, rd)
router.HandleFunc("/api/v1/stats/region", statsHandler.Region).Methods("GET")

trendHandler := newTrendHandler(svr, rd)
router.HandleFunc("/api/v1/trend", trendHandler.Handle).Methods("GET")

router.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {}).Methods("GET")
return router
}
183 changes: 183 additions & 0 deletions server/api/trend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"net/http"
"strconv"
"time"

"github.com/juju/errors"
"github.com/pingcap/pd/pkg/typeutil"
"github.com/pingcap/pd/server"
"github.com/pingcap/pd/server/core"
"github.com/unrolled/render"
)

// Trend describes the cluster's schedule trend.
type Trend struct {
Stores []trendStore `json:"stores"`
History *trendHistory `json:"history"`
}

type trendStore struct {
ID uint64 `json:"id"`
Address string `json:"address"`
StateName string `json:"state_name"`
Capacity uint64 `json:"capacity"`
Available uint64 `json:"available"`
RegionCount int `json:"region_count"`
LeaderCount int `json:"leader_count"`
StartTS *time.Time `json:"start_ts,omitempty"`
LastHeartbeatTS *time.Time `json:"last_heartbeat_ts,omitempty"`
Uptime *typeutil.Duration `json:"uptime,omitempty"`

HotWriteFlow uint64 `json:"hot_write_flow"`
HotWriteRegionFlows []uint64 `json:"hot_write_region_flows"`
HotReadFlow uint64 `json:"hot_read_flow"`
HotReadRegionFlows []uint64 `json:"hot_read_region_flows"`
}

type trendHistory struct {
StartTime int64 `json:"start"`
EndTime int64 `json:"end"`
Entries []trendHistoryEntry `json:"entries"`
}

type trendHistoryEntry struct {
From uint64 `json:"from"`
To uint64 `json:"to"`
Kind string `json:"kind"`
Count int `json:"count"`
}

type trendHandler struct {
*server.Handler
svr *server.Server
rd *render.Render
}

func newTrendHandler(s *server.Server, rd *render.Render) *trendHandler {
return &trendHandler{
Handler: s.GetHandler(),
svr: s,
rd: rd,
}
}

func (h *trendHandler) Handle(w http.ResponseWriter, r *http.Request) {
var from time.Time
if fromStr := r.URL.Query()["from"]; len(fromStr) > 0 {
fromInt, err := strconv.ParseInt(fromStr[0], 10, 64)
if err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
from = time.Unix(fromInt, 0)
}

stores, err := h.getTrendStores()
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}

history, err := h.getTrendHistory(from)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}

trend := Trend{
Stores: stores,
History: history,
}
h.rd.JSON(w, http.StatusOK, trend)
}

func (h *trendHandler) getTrendStores() ([]trendStore, error) {
maxStoreDownTime := h.svr.GetScheduleConfig().MaxStoreDownTime.Duration

var readStats, writeStats core.StoreHotRegionsStat
if hotRead := h.GetHotReadRegions(); hotRead != nil {
readStats = hotRead.AsLeader
}
if hotWrite := h.GetHotWriteRegions(); hotWrite != nil {
writeStats = hotWrite.AsPeer
}
stores, err := h.GetStores()
if err != nil {
return nil, errors.Trace(err)
}

trendStores := make([]trendStore, 0, len(stores))
for _, store := range stores {
info := newStoreInfo(store, maxStoreDownTime)
s := trendStore{
ID: info.Store.GetId(),
Address: info.Store.GetAddress(),
StateName: info.Store.StateName,
Capacity: uint64(info.Status.Capacity),
Available: uint64(info.Status.Available),
RegionCount: info.Status.RegionCount,
LeaderCount: info.Status.LeaderCount,
StartTS: info.Status.StartTS,
LastHeartbeatTS: info.Status.LastHeartbeatTS,
Uptime: info.Status.Uptime,
}
s.HotReadFlow, s.HotReadRegionFlows = h.getStoreFlow(readStats, store.GetId())
s.HotWriteFlow, s.HotWriteRegionFlows = h.getStoreFlow(writeStats, store.GetId())
trendStores = append(trendStores, s)
}
return trendStores, nil
}

func (h *trendHandler) getStoreFlow(stats core.StoreHotRegionsStat, storeID uint64) (storeFlow uint64, regionFlows []uint64) {
if stats == nil {
return
}
if stat, ok := stats[storeID]; ok {
storeFlow = stat.TotalFlowBytes
for _, flow := range stat.RegionsStat {
regionFlows = append(regionFlows, flow.FlowBytes)
}
}
return
}

func (h *trendHandler) getTrendHistory(start time.Time) (*trendHistory, error) {
operatorHistory, err := h.GetHistory(start)
if err != nil {
return nil, errors.Trace(err)
}
// Use a tmp map to merge same histories together.
historyMap := make(map[trendHistoryEntry]int)
for _, entry := range operatorHistory {
historyMap[trendHistoryEntry{
From: entry.From,
To: entry.To,
Kind: entry.Kind.String(),
}]++
}
history := make([]trendHistoryEntry, 0, len(historyMap))
for entry, count := range historyMap {
entry.Count = count
history = append(history, entry)
}
return &trendHistory{
StartTime: start.Unix(),
EndTime: time.Now().Unix(),
Entries: history,
}, nil
}
Loading

0 comments on commit 5598c00

Please sign in to comment.