diff --git a/pkg/mcs/scheduling/server/apis/v1/api.go b/pkg/mcs/scheduling/server/apis/v1/api.go index c3520d235ec..82a4766f057 100644 --- a/pkg/mcs/scheduling/server/apis/v1/api.go +++ b/pkg/mcs/scheduling/server/apis/v1/api.go @@ -29,10 +29,12 @@ import ( "github.com/gin-contrib/pprof" "github.com/gin-gonic/gin" "github.com/joho/godotenv" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server" mcsutils "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/response" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/handler" "github.com/tikv/pd/pkg/schedule/operator" @@ -124,6 +126,7 @@ func NewService(srv *scheserver.Service) *Service { s.RegisterCheckersRouter() s.RegisterHotspotRouter() s.RegisterRegionsRouter() + s.RegisterStoresRouter() return s } @@ -173,9 +176,19 @@ func (s *Service) RegisterOperatorsRouter() { router.GET("/records", getOperatorRecords) } +// RegisterStoresRouter registers the router of the stores handler. +func (s *Service) RegisterStoresRouter() { + router := s.root.Group("stores") + router.GET("", getAllStores) + router.GET("/:id", getStoreByID) +} + // RegisterRegionsRouter registers the router of the regions handler. func (s *Service) RegisterRegionsRouter() { router := s.root.Group("regions") + router.GET("", getAllRegions) + router.GET("/:id", getRegionByID) + router.GET("/count", getRegionCount) router.POST("/accelerate-schedule", accelerateRegionsScheduleInRange) router.POST("/accelerate-schedule/batch", accelerateRegionsScheduleInRanges) router.POST("/scatter", scatterRegions) @@ -1318,3 +1331,115 @@ func checkRegionsReplicated(c *gin.Context) { } c.IndentedJSON(http.StatusOK, state) } + +// @Tags store +// @Summary Get a store's information. +// @Param id path integer true "Store Id" +// @Produce json +// @Success 200 {object} response.StoreInfo +// @Failure 400 {string} string "The input is invalid." +// @Failure 404 {string} string "The store does not exist." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /stores/{id} [get] +func getStoreByID(c *gin.Context) { + svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) + idStr := c.Param("id") + storeID, err := strconv.ParseUint(idStr, 10, 64) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + store := svr.GetBasicCluster().GetStore(storeID) + if store == nil { + c.String(http.StatusNotFound, errs.ErrStoreNotFound.FastGenByArgs(storeID).Error()) + return + } + + storeInfo := response.BuildStoreInfo(&svr.GetConfig().Schedule, store) + c.IndentedJSON(http.StatusOK, storeInfo) +} + +// @Tags store +// @Summary Get all stores in the cluster. +// @Produce json +// @Success 200 {object} response.StoresInfo +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /stores [get] +func getAllStores(c *gin.Context) { + svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) + stores := svr.GetBasicCluster().GetMetaStores() + StoresInfo := &response.StoresInfo{ + Stores: make([]*response.StoreInfo, 0, len(stores)), + } + + for _, s := range stores { + storeID := s.GetId() + store := svr.GetBasicCluster().GetStore(storeID) + if store == nil { + c.String(http.StatusInternalServerError, errs.ErrStoreNotFound.FastGenByArgs(storeID).Error()) + return + } + if store.GetMeta().State == metapb.StoreState_Tombstone { + continue + } + storeInfo := response.BuildStoreInfo(&svr.GetConfig().Schedule, store) + StoresInfo.Stores = append(StoresInfo.Stores, storeInfo) + } + StoresInfo.Count = len(StoresInfo.Stores) + c.IndentedJSON(http.StatusOK, StoresInfo) +} + +// @Tags region +// @Summary List all regions in the cluster. +// @Produce json +// @Success 200 {object} response.RegionsInfo +// @Router /regions [get] +func getAllRegions(c *gin.Context) { + svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) + regions := svr.GetBasicCluster().GetRegions() + b, err := response.MarshalRegionsInfoJSON(c.Request.Context(), regions) + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + c.Data(http.StatusOK, "application/json", b) +} + +// @Tags region +// @Summary Get count of regions. +// @Produce json +// @Success 200 {object} response.RegionsInfo +// @Router /regions/count [get] +func getRegionCount(c *gin.Context) { + svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) + count := svr.GetBasicCluster().GetTotalRegionCount() + c.IndentedJSON(http.StatusOK, &response.RegionsInfo{Count: count}) +} + +// @Tags region +// @Summary Search for a region by region ID. +// @Param id path integer true "Region Id" +// @Produce json +// @Success 200 {object} response.RegionInfo +// @Failure 400 {string} string "The input is invalid." +// @Router /regions/{id} [get] +func getRegionByID(c *gin.Context) { + svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) + idStr := c.Param("id") + regionID, err := strconv.ParseUint(idStr, 10, 64) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + regionInfo := svr.GetBasicCluster().GetRegion(regionID) + if regionInfo == nil { + c.String(http.StatusNotFound, errs.ErrRegionNotFound.FastGenByArgs(regionID).Error()) + return + } + b, err := response.MarshalRegionInfoJSON(c.Request.Context(), regionInfo) + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + c.Data(http.StatusOK, "application/json", b) +} diff --git a/pkg/mcs/scheduling/server/meta/watcher.go b/pkg/mcs/scheduling/server/meta/watcher.go index 925b28763b5..2daa6766d75 100644 --- a/pkg/mcs/scheduling/server/meta/watcher.go +++ b/pkg/mcs/scheduling/server/meta/watcher.go @@ -78,6 +78,7 @@ func (w *Watcher) initializeStoreWatcher() error { zap.String("event-kv-key", string(kv.Key)), zap.Error(err)) return err } + log.Debug("update store meta", zap.Stringer("store", store)) origin := w.basicCluster.GetStore(store.GetId()) if origin == nil { w.basicCluster.PutStore(core.NewStoreInfo(store)) @@ -101,6 +102,7 @@ func (w *Watcher) initializeStoreWatcher() error { origin := w.basicCluster.GetStore(storeID) if origin != nil { w.basicCluster.DeleteStore(origin) + log.Info("delete store meta", zap.Uint64("store-id", storeID)) } return nil } diff --git a/pkg/response/region.go b/pkg/response/region.go new file mode 100644 index 00000000000..153294c2861 --- /dev/null +++ b/pkg/response/region.go @@ -0,0 +1,275 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package response + +import ( + "context" + + "github.com/mailru/easyjson/jwriter" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/kvproto/pkg/replication_modepb" + "github.com/tikv/pd/pkg/core" +) + +// MetaPeer is api compatible with *metapb.Peer. +// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it. +type MetaPeer struct { + *metapb.Peer + // RoleName is `Role.String()`. + // Since Role is serialized as int by json by default, + // introducing it will make the output of pd-ctl easier to identify Role. + RoleName string `json:"role_name"` + // IsLearner is `Role == "Learner"`. + // Since IsLearner was changed to Role in kvproto in 5.0, this field was introduced to ensure api compatibility. + IsLearner bool `json:"is_learner,omitempty"` +} + +func (m *MetaPeer) setDefaultIfNil() { + if m.Peer == nil { + m.Peer = &metapb.Peer{ + Id: m.GetId(), + StoreId: m.GetStoreId(), + Role: m.GetRole(), + IsWitness: m.GetIsWitness(), + } + } +} + +// PDPeerStats is api compatible with *pdpb.PeerStats. +// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it. +type PDPeerStats struct { + *pdpb.PeerStats + Peer MetaPeer `json:"peer"` +} + +func (s *PDPeerStats) setDefaultIfNil() { + if s.PeerStats == nil { + s.PeerStats = &pdpb.PeerStats{ + Peer: s.GetPeer(), + DownSeconds: s.GetDownSeconds(), + } + } + s.Peer.setDefaultIfNil() +} + +func fromPeer(peer *metapb.Peer) MetaPeer { + if peer == nil { + return MetaPeer{} + } + return MetaPeer{ + Peer: peer, + RoleName: peer.GetRole().String(), + IsLearner: core.IsLearner(peer), + } +} + +func fromPeerSlice(peers []*metapb.Peer) []MetaPeer { + if peers == nil { + return nil + } + slice := make([]MetaPeer, len(peers)) + for i, peer := range peers { + slice[i] = fromPeer(peer) + } + return slice +} + +func fromPeerStats(peer *pdpb.PeerStats) PDPeerStats { + return PDPeerStats{ + PeerStats: peer, + Peer: fromPeer(peer.Peer), + } +} + +func fromPeerStatsSlice(peers []*pdpb.PeerStats) []PDPeerStats { + if peers == nil { + return nil + } + slice := make([]PDPeerStats, len(peers)) + for i, peer := range peers { + slice[i] = fromPeerStats(peer) + } + return slice +} + +// RegionInfo records detail region info for api usage. +// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it. +// easyjson:json +type RegionInfo struct { + ID uint64 `json:"id"` + StartKey string `json:"start_key"` + EndKey string `json:"end_key"` + RegionEpoch *metapb.RegionEpoch `json:"epoch,omitempty"` + Peers []MetaPeer `json:"peers,omitempty"` + + Leader MetaPeer `json:"leader,omitempty"` + DownPeers []PDPeerStats `json:"down_peers,omitempty"` + PendingPeers []MetaPeer `json:"pending_peers,omitempty"` + CPUUsage uint64 `json:"cpu_usage"` + WrittenBytes uint64 `json:"written_bytes"` + ReadBytes uint64 `json:"read_bytes"` + WrittenKeys uint64 `json:"written_keys"` + ReadKeys uint64 `json:"read_keys"` + ApproximateSize int64 `json:"approximate_size"` + ApproximateKeys int64 `json:"approximate_keys"` + Buckets []string `json:"buckets,omitempty"` + + ReplicationStatus *ReplicationStatus `json:"replication_status,omitempty"` +} + +// ReplicationStatus represents the replication mode status of the region. +// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it. +type ReplicationStatus struct { + State string `json:"state"` + StateID uint64 `json:"state_id"` +} + +func fromPBReplicationStatus(s *replication_modepb.RegionReplicationStatus) *ReplicationStatus { + if s == nil { + return nil + } + return &ReplicationStatus{ + State: s.GetState().String(), + StateID: s.GetStateId(), + } +} + +// NewAPIRegionInfo create a new API RegionInfo. +func NewAPIRegionInfo(r *core.RegionInfo) *RegionInfo { + return InitRegion(r, &RegionInfo{}) +} + +// InitRegion init a new API RegionInfo from the core.RegionInfo. +func InitRegion(r *core.RegionInfo, s *RegionInfo) *RegionInfo { + if r == nil { + return nil + } + + s.ID = r.GetID() + s.StartKey = core.HexRegionKeyStr(r.GetStartKey()) + s.EndKey = core.HexRegionKeyStr(r.GetEndKey()) + s.RegionEpoch = r.GetRegionEpoch() + s.Peers = fromPeerSlice(r.GetPeers()) + s.Leader = fromPeer(r.GetLeader()) + s.DownPeers = fromPeerStatsSlice(r.GetDownPeers()) + s.PendingPeers = fromPeerSlice(r.GetPendingPeers()) + s.CPUUsage = r.GetCPUUsage() + s.WrittenBytes = r.GetBytesWritten() + s.WrittenKeys = r.GetKeysWritten() + s.ReadBytes = r.GetBytesRead() + s.ReadKeys = r.GetKeysRead() + s.ApproximateSize = r.GetApproximateSize() + s.ApproximateKeys = r.GetApproximateKeys() + s.ReplicationStatus = fromPBReplicationStatus(r.GetReplicationStatus()) + s.Buckets = nil + + keys := r.GetBuckets().GetKeys() + if len(keys) > 0 { + s.Buckets = make([]string, len(keys)) + for i, key := range keys { + s.Buckets[i] = core.HexRegionKeyStr(key) + } + } + return s +} + +// Adjust is only used in testing, in order to compare the data from json deserialization. +func (r *RegionInfo) Adjust() { + for _, peer := range r.DownPeers { + // Since api.PDPeerStats uses the api.MetaPeer type variable Peer to overwrite PeerStats.Peer, + // it needs to be restored after deserialization to be completely consistent with the original. + peer.PeerStats.Peer = peer.Peer.Peer + } +} + +// RegionsInfo contains some regions with the detailed region info. +type RegionsInfo struct { + Count int `json:"count"` + Regions []RegionInfo `json:"regions"` +} + +// Adjust is only used in testing, in order to compare the data from json deserialization. +func (s *RegionsInfo) Adjust() { + for _, r := range s.Regions { + r.Adjust() + } +} + +// MarshalRegionInfoJSON marshals region to bytes in `RegionInfo`'s JSON format. +// It is used to reduce the cost of JSON serialization. +func MarshalRegionInfoJSON(ctx context.Context, r *core.RegionInfo) ([]byte, error) { + out := &jwriter.Writer{} + + region := &RegionInfo{} + select { + case <-ctx.Done(): + // Return early, avoid the unnecessary computation. + // See more details in https://github.com/tikv/pd/issues/6835 + return nil, ctx.Err() + default: + } + + covertAPIRegionInfo(r, region, out) + return out.Buffer.BuildBytes(), out.Error +} + +// MarshalRegionsInfoJSON marshals regions to bytes in `RegionsInfo`'s JSON format. +// It is used to reduce the cost of JSON serialization. +func MarshalRegionsInfoJSON(ctx context.Context, regions []*core.RegionInfo) ([]byte, error) { + out := &jwriter.Writer{} + out.RawByte('{') + + out.RawString("\"count\":") + out.Int(len(regions)) + + out.RawString(",\"regions\":") + out.RawByte('[') + region := &RegionInfo{} + for i, r := range regions { + select { + case <-ctx.Done(): + // Return early, avoid the unnecessary computation. + // See more details in https://github.com/tikv/pd/issues/6835 + return nil, ctx.Err() + default: + } + if i > 0 { + out.RawByte(',') + } + covertAPIRegionInfo(r, region, out) + } + out.RawByte(']') + + out.RawByte('}') + return out.Buffer.BuildBytes(), out.Error +} + +func covertAPIRegionInfo(r *core.RegionInfo, region *RegionInfo, out *jwriter.Writer) { + InitRegion(r, region) + // EasyJSON will not check anonymous struct pointer field and will panic if the field is nil. + // So we need to set the field to default value explicitly when the anonymous struct pointer is nil. + region.Leader.setDefaultIfNil() + for i := range region.Peers { + region.Peers[i].setDefaultIfNil() + } + for i := range region.PendingPeers { + region.PendingPeers[i].setDefaultIfNil() + } + for i := range region.DownPeers { + region.DownPeers[i].setDefaultIfNil() + } + region.MarshalEasyJSON(out) +} diff --git a/server/api/region_easyjson.go b/pkg/response/region_easyjson.go similarity index 99% rename from server/api/region_easyjson.go rename to pkg/response/region_easyjson.go index 4bd9fe69e42..33598360235 100644 --- a/server/api/region_easyjson.go +++ b/pkg/response/region_easyjson.go @@ -1,6 +1,6 @@ // Code generated by easyjson for marshaling/unmarshaling. DO NOT EDIT. -package api +package response import ( json "encoding/json" diff --git a/pkg/response/region_test.go b/pkg/response/region_test.go new file mode 100644 index 00000000000..fb29e7dbe21 --- /dev/null +++ b/pkg/response/region_test.go @@ -0,0 +1,70 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package response + +import ( + "encoding/json" + "testing" + + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/stretchr/testify/require" +) + +func TestPeer(t *testing.T) { + re := require.New(t) + peers := []*metapb.Peer{ + {Id: 1, StoreId: 10, Role: metapb.PeerRole_Voter}, + {Id: 2, StoreId: 20, Role: metapb.PeerRole_Learner}, + {Id: 3, StoreId: 30, Role: metapb.PeerRole_IncomingVoter}, + {Id: 4, StoreId: 40, Role: metapb.PeerRole_DemotingVoter}, + } + // float64 is the default numeric type for JSON + expected := []map[string]interface{}{ + {"id": float64(1), "store_id": float64(10), "role_name": "Voter"}, + {"id": float64(2), "store_id": float64(20), "role": float64(1), "role_name": "Learner", "is_learner": true}, + {"id": float64(3), "store_id": float64(30), "role": float64(2), "role_name": "IncomingVoter"}, + {"id": float64(4), "store_id": float64(40), "role": float64(3), "role_name": "DemotingVoter"}, + } + + data, err := json.Marshal(fromPeerSlice(peers)) + re.NoError(err) + var ret []map[string]interface{} + re.NoError(json.Unmarshal(data, &ret)) + re.Equal(expected, ret) +} + +func TestPeerStats(t *testing.T) { + re := require.New(t) + peers := []*pdpb.PeerStats{ + {Peer: &metapb.Peer{Id: 1, StoreId: 10, Role: metapb.PeerRole_Voter}, DownSeconds: 0}, + {Peer: &metapb.Peer{Id: 2, StoreId: 20, Role: metapb.PeerRole_Learner}, DownSeconds: 1}, + {Peer: &metapb.Peer{Id: 3, StoreId: 30, Role: metapb.PeerRole_IncomingVoter}, DownSeconds: 2}, + {Peer: &metapb.Peer{Id: 4, StoreId: 40, Role: metapb.PeerRole_DemotingVoter}, DownSeconds: 3}, + } + // float64 is the default numeric type for JSON + expected := []map[string]interface{}{ + {"peer": map[string]interface{}{"id": float64(1), "store_id": float64(10), "role_name": "Voter"}}, + {"peer": map[string]interface{}{"id": float64(2), "store_id": float64(20), "role": float64(1), "role_name": "Learner", "is_learner": true}, "down_seconds": float64(1)}, + {"peer": map[string]interface{}{"id": float64(3), "store_id": float64(30), "role": float64(2), "role_name": "IncomingVoter"}, "down_seconds": float64(2)}, + {"peer": map[string]interface{}{"id": float64(4), "store_id": float64(40), "role": float64(3), "role_name": "DemotingVoter"}, "down_seconds": float64(3)}, + } + + data, err := json.Marshal(fromPeerStatsSlice(peers)) + re.NoError(err) + var ret []map[string]interface{} + re.NoError(json.Unmarshal(data, &ret)) + re.Equal(expected, ret) +} diff --git a/pkg/response/store.go b/pkg/response/store.go new file mode 100644 index 00000000000..6aebd65b8cf --- /dev/null +++ b/pkg/response/store.go @@ -0,0 +1,150 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package response + +import ( + "time" + + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/core/constant" + sc "github.com/tikv/pd/pkg/schedule/config" + "github.com/tikv/pd/pkg/utils/typeutil" +) + +// MetaStore contains meta information about a store. +type MetaStore struct { + *metapb.Store + StateName string `json:"state_name"` +} + +// SlowTrend contains slow trend information about a store. +type SlowTrend struct { + // CauseValue is the slow trend detecting raw input, it changes by the performance and pressure along time of the store. + // The value itself is not important, what matter is: + // - The comparition result from store to store. + // - The change magnitude along time (represented by CauseRate). + // Currently it's one of store's internal latency (duration of waiting in the task queue of raftstore.store). + CauseValue float64 `json:"cause_value"` + // CauseRate is for mesuring the change magnitude of CauseValue of the store, + // - CauseRate > 0 means the store is become slower currently + // - CauseRate < 0 means the store is become faster currently + // - CauseRate == 0 means the store's performance and pressure does not have significant changes + CauseRate float64 `json:"cause_rate"` + // ResultValue is the current gRPC QPS of the store. + ResultValue float64 `json:"result_value"` + // ResultRate is for mesuring the change magnitude of ResultValue of the store. + ResultRate float64 `json:"result_rate"` +} + +// StoreStatus contains status about a store. +type StoreStatus struct { + Capacity typeutil.ByteSize `json:"capacity"` + Available typeutil.ByteSize `json:"available"` + UsedSize typeutil.ByteSize `json:"used_size"` + LeaderCount int `json:"leader_count"` + LeaderWeight float64 `json:"leader_weight"` + LeaderScore float64 `json:"leader_score"` + LeaderSize int64 `json:"leader_size"` + RegionCount int `json:"region_count"` + RegionWeight float64 `json:"region_weight"` + RegionScore float64 `json:"region_score"` + RegionSize int64 `json:"region_size"` + LearnerCount int `json:"learner_count,omitempty"` + WitnessCount int `json:"witness_count,omitempty"` + SlowScore uint64 `json:"slow_score,omitempty"` + SlowTrend *SlowTrend `json:"slow_trend,omitempty"` + SendingSnapCount uint32 `json:"sending_snap_count,omitempty"` + ReceivingSnapCount uint32 `json:"receiving_snap_count,omitempty"` + IsBusy bool `json:"is_busy,omitempty"` + StartTS *time.Time `json:"start_ts,omitempty"` + LastHeartbeatTS *time.Time `json:"last_heartbeat_ts,omitempty"` + Uptime *typeutil.Duration `json:"uptime,omitempty"` +} + +// StoreInfo contains information about a store. +type StoreInfo struct { + Store *MetaStore `json:"store"` + Status *StoreStatus `json:"status"` +} + +const ( + // DisconnectedName is the name when store is disconnected. + DisconnectedName = "Disconnected" + // DownStateName is the name when store is down. + DownStateName = "Down" +) + +// BuildStoreInfo builds a storeInfo response. +func BuildStoreInfo(opt *sc.ScheduleConfig, store *core.StoreInfo) *StoreInfo { + var slowTrend *SlowTrend + coreSlowTrend := store.GetSlowTrend() + if coreSlowTrend != nil { + slowTrend = &SlowTrend{coreSlowTrend.CauseValue, coreSlowTrend.CauseRate, coreSlowTrend.ResultValue, coreSlowTrend.ResultRate} + } + s := &StoreInfo{ + Store: &MetaStore{ + Store: store.GetMeta(), + StateName: store.GetState().String(), + }, + Status: &StoreStatus{ + Capacity: typeutil.ByteSize(store.GetCapacity()), + Available: typeutil.ByteSize(store.GetAvailable()), + UsedSize: typeutil.ByteSize(store.GetUsedSize()), + LeaderCount: store.GetLeaderCount(), + LeaderWeight: store.GetLeaderWeight(), + LeaderScore: store.LeaderScore(constant.StringToSchedulePolicy(opt.LeaderSchedulePolicy), 0), + LeaderSize: store.GetLeaderSize(), + RegionCount: store.GetRegionCount(), + RegionWeight: store.GetRegionWeight(), + RegionScore: store.RegionScore(opt.RegionScoreFormulaVersion, opt.HighSpaceRatio, opt.LowSpaceRatio, 0), + RegionSize: store.GetRegionSize(), + LearnerCount: store.GetLearnerCount(), + WitnessCount: store.GetWitnessCount(), + SlowScore: store.GetSlowScore(), + SlowTrend: slowTrend, + SendingSnapCount: store.GetSendingSnapCount(), + ReceivingSnapCount: store.GetReceivingSnapCount(), + IsBusy: store.IsBusy(), + }, + } + + if store.GetStoreStats() != nil { + startTS := store.GetStartTime() + s.Status.StartTS = &startTS + } + if lastHeartbeat := store.GetLastHeartbeatTS(); !lastHeartbeat.IsZero() { + s.Status.LastHeartbeatTS = &lastHeartbeat + } + if upTime := store.GetUptime(); upTime > 0 { + duration := typeutil.NewDuration(upTime) + s.Status.Uptime = &duration + } + + if store.GetState() == metapb.StoreState_Up { + if store.DownTime() > opt.MaxStoreDownTime.Duration { + s.Store.StateName = DownStateName + } else if store.IsDisconnected() { + s.Store.StateName = DisconnectedName + } + } + return s +} + +// StoresInfo records stores' info. +type StoresInfo struct { + Count int `json:"count"` + Stores []*StoreInfo `json:"stores"` +} diff --git a/pkg/utils/grpcutil/grpcutil.go b/pkg/utils/grpcutil/grpcutil.go index 759e6747cf9..658ac079d0f 100644 --- a/pkg/utils/grpcutil/grpcutil.go +++ b/pkg/utils/grpcutil/grpcutil.go @@ -252,11 +252,11 @@ func CheckStream(ctx context.Context, cancel context.CancelFunc, done chan struc // NeedRebuildConnection checks if the error is a connection error. func NeedRebuildConnection(err error) bool { - return err == io.EOF || + return (err != nil) && (err == io.EOF || strings.Contains(err.Error(), codes.Unavailable.String()) || // Unavailable indicates the service is currently unavailable. This is a most likely a transient condition. strings.Contains(err.Error(), codes.DeadlineExceeded.String()) || // DeadlineExceeded means operation expired before completion. strings.Contains(err.Error(), codes.Internal.String()) || // Internal errors. strings.Contains(err.Error(), codes.Unknown.String()) || // Unknown error. - strings.Contains(err.Error(), codes.ResourceExhausted.String()) // ResourceExhausted is returned when either the client or the server has exhausted their resources. + strings.Contains(err.Error(), codes.ResourceExhausted.String())) // ResourceExhausted is returned when either the client or the server has exhausted their resources. // Besides, we don't need to rebuild the connection if the code is Canceled, which means the client cancelled the request. } diff --git a/server/api/label.go b/server/api/label.go index b7f279d86cc..ead6b30ae26 100644 --- a/server/api/label.go +++ b/server/api/label.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/response" "github.com/tikv/pd/server" "github.com/unrolled/render" ) @@ -65,7 +66,7 @@ func (h *labelsHandler) GetLabels(w http.ResponseWriter, r *http.Request) { // @Param name query string true "name of store label filter" // @Param value query string true "value of store label filter" // @Produce json -// @Success 200 {object} StoresInfo +// @Success 200 {object} response.StoresInfo // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /labels/stores [get] func (h *labelsHandler) GetStoresByLabel(w http.ResponseWriter, r *http.Request) { @@ -79,8 +80,8 @@ func (h *labelsHandler) GetStoresByLabel(w http.ResponseWriter, r *http.Request) } stores := rc.GetMetaStores() - storesInfo := &StoresInfo{ - Stores: make([]*StoreInfo, 0, len(stores)), + storesInfo := &response.StoresInfo{ + Stores: make([]*response.StoreInfo, 0, len(stores)), } stores = filter.filter(stores) @@ -92,7 +93,7 @@ func (h *labelsHandler) GetStoresByLabel(w http.ResponseWriter, r *http.Request) return } - storeInfo := newStoreInfo(h.svr.GetScheduleConfig(), store) + storeInfo := response.BuildStoreInfo(h.svr.GetScheduleConfig(), store) storesInfo.Stores = append(storesInfo.Stores, storeInfo) } storesInfo.Count = len(storesInfo.Stores) diff --git a/server/api/label_test.go b/server/api/label_test.go index 9bcd40aac01..d085b2a2ea0 100644 --- a/server/api/label_test.go +++ b/server/api/label_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/suite" + "github.com/tikv/pd/pkg/response" tu "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server" "github.com/tikv/pd/server/config" @@ -177,7 +178,7 @@ func (suite *labelsStoreTestSuite) TestStoresLabelFilter() { re := suite.Require() for _, testCase := range testCases { url := fmt.Sprintf("%s/labels/stores?name=%s&value=%s", suite.urlPrefix, testCase.name, testCase.value) - info := new(StoresInfo) + info := new(response.StoresInfo) err := tu.ReadGetJSON(re, testDialClient, url, info) suite.NoError(err) checkStoresInfo(re, info.Stores, testCase.want) diff --git a/server/api/region.go b/server/api/region.go index abcc1e4d3b2..ef9e48a38e8 100644 --- a/server/api/region.go +++ b/server/api/region.go @@ -16,7 +16,6 @@ package api import ( "container/heap" - "context" "fmt" "net/http" "net/url" @@ -25,14 +24,11 @@ import ( "strings" "github.com/gorilla/mux" - jwriter "github.com/mailru/easyjson/jwriter" "github.com/pingcap/errors" - "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/kvproto/pkg/pdpb" - "github.com/pingcap/kvproto/pkg/replication_modepb" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/keyspace" + "github.com/tikv/pd/pkg/response" "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/typeutil" @@ -40,192 +36,6 @@ import ( "github.com/unrolled/render" ) -// MetaPeer is api compatible with *metapb.Peer. -// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it. -type MetaPeer struct { - *metapb.Peer - // RoleName is `Role.String()`. - // Since Role is serialized as int by json by default, - // introducing it will make the output of pd-ctl easier to identify Role. - RoleName string `json:"role_name"` - // IsLearner is `Role == "Learner"`. - // Since IsLearner was changed to Role in kvproto in 5.0, this field was introduced to ensure api compatibility. - IsLearner bool `json:"is_learner,omitempty"` -} - -func (m *MetaPeer) setDefaultIfNil() { - if m.Peer == nil { - m.Peer = &metapb.Peer{ - Id: m.GetId(), - StoreId: m.GetStoreId(), - Role: m.GetRole(), - IsWitness: m.GetIsWitness(), - } - } -} - -// PDPeerStats is api compatible with *pdpb.PeerStats. -// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it. -type PDPeerStats struct { - *pdpb.PeerStats - Peer MetaPeer `json:"peer"` -} - -func (s *PDPeerStats) setDefaultIfNil() { - if s.PeerStats == nil { - s.PeerStats = &pdpb.PeerStats{ - Peer: s.GetPeer(), - DownSeconds: s.GetDownSeconds(), - } - } - s.Peer.setDefaultIfNil() -} - -func fromPeer(peer *metapb.Peer) MetaPeer { - if peer == nil { - return MetaPeer{} - } - return MetaPeer{ - Peer: peer, - RoleName: peer.GetRole().String(), - IsLearner: core.IsLearner(peer), - } -} - -func fromPeerSlice(peers []*metapb.Peer) []MetaPeer { - if peers == nil { - return nil - } - slice := make([]MetaPeer, len(peers)) - for i, peer := range peers { - slice[i] = fromPeer(peer) - } - return slice -} - -func fromPeerStats(peer *pdpb.PeerStats) PDPeerStats { - return PDPeerStats{ - PeerStats: peer, - Peer: fromPeer(peer.Peer), - } -} - -func fromPeerStatsSlice(peers []*pdpb.PeerStats) []PDPeerStats { - if peers == nil { - return nil - } - slice := make([]PDPeerStats, len(peers)) - for i, peer := range peers { - slice[i] = fromPeerStats(peer) - } - return slice -} - -// RegionInfo records detail region info for api usage. -// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it. -// easyjson:json -type RegionInfo struct { - ID uint64 `json:"id"` - StartKey string `json:"start_key"` - EndKey string `json:"end_key"` - RegionEpoch *metapb.RegionEpoch `json:"epoch,omitempty"` - Peers []MetaPeer `json:"peers,omitempty"` - - Leader MetaPeer `json:"leader,omitempty"` - DownPeers []PDPeerStats `json:"down_peers,omitempty"` - PendingPeers []MetaPeer `json:"pending_peers,omitempty"` - CPUUsage uint64 `json:"cpu_usage"` - WrittenBytes uint64 `json:"written_bytes"` - ReadBytes uint64 `json:"read_bytes"` - WrittenKeys uint64 `json:"written_keys"` - ReadKeys uint64 `json:"read_keys"` - ApproximateSize int64 `json:"approximate_size"` - ApproximateKvSize int64 `json:"approximate_kv_size"` - ApproximateKeys int64 `json:"approximate_keys"` - Buckets []string `json:"buckets,omitempty"` - - ReplicationStatus *ReplicationStatus `json:"replication_status,omitempty"` -} - -// ReplicationStatus represents the replication mode status of the region. -// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it. -type ReplicationStatus struct { - State string `json:"state"` - StateID uint64 `json:"state_id"` -} - -func fromPBReplicationStatus(s *replication_modepb.RegionReplicationStatus) *ReplicationStatus { - if s == nil { - return nil - } - return &ReplicationStatus{ - State: s.GetState().String(), - StateID: s.GetStateId(), - } -} - -// NewAPIRegionInfo create a new API RegionInfo. -func NewAPIRegionInfo(r *core.RegionInfo) *RegionInfo { - return InitRegion(r, &RegionInfo{}) -} - -// InitRegion init a new API RegionInfo from the core.RegionInfo. -func InitRegion(r *core.RegionInfo, s *RegionInfo) *RegionInfo { - if r == nil { - return nil - } - - s.ID = r.GetID() - s.StartKey = core.HexRegionKeyStr(r.GetStartKey()) - s.EndKey = core.HexRegionKeyStr(r.GetEndKey()) - s.RegionEpoch = r.GetRegionEpoch() - s.Peers = fromPeerSlice(r.GetPeers()) - s.Leader = fromPeer(r.GetLeader()) - s.DownPeers = fromPeerStatsSlice(r.GetDownPeers()) - s.PendingPeers = fromPeerSlice(r.GetPendingPeers()) - s.CPUUsage = r.GetCPUUsage() - s.WrittenBytes = r.GetBytesWritten() - s.WrittenKeys = r.GetKeysWritten() - s.ReadBytes = r.GetBytesRead() - s.ReadKeys = r.GetKeysRead() - s.ApproximateSize = r.GetApproximateSize() - s.ApproximateKvSize = r.GetApproximateKvSize() - s.ApproximateKeys = r.GetApproximateKeys() - s.ReplicationStatus = fromPBReplicationStatus(r.GetReplicationStatus()) - s.Buckets = nil - - keys := r.GetBuckets().GetKeys() - if len(keys) > 0 { - s.Buckets = make([]string, len(keys)) - for i, key := range keys { - s.Buckets[i] = core.HexRegionKeyStr(key) - } - } - return s -} - -// Adjust is only used in testing, in order to compare the data from json deserialization. -func (r *RegionInfo) Adjust() { - for _, peer := range r.DownPeers { - // Since api.PDPeerStats uses the api.MetaPeer type variable Peer to overwrite PeerStats.Peer, - // it needs to be restored after deserialization to be completely consistent with the original. - peer.PeerStats.Peer = peer.Peer.Peer - } -} - -// RegionsInfo contains some regions with the detailed region info. -type RegionsInfo struct { - Count int `json:"count"` - Regions []RegionInfo `json:"regions"` -} - -// Adjust is only used in testing, in order to compare the data from json deserialization. -func (s *RegionsInfo) Adjust() { - for _, r := range s.Regions { - r.Adjust() - } -} - type regionHandler struct { svr *server.Server rd *render.Render @@ -242,7 +52,7 @@ func newRegionHandler(svr *server.Server, rd *render.Render) *regionHandler { // @Summary Search for a region by region ID. // @Param id path integer true "Region Id" // @Produce json -// @Success 200 {object} RegionInfo +// @Success 200 {object} response.RegionInfo // @Failure 400 {string} string "The input is invalid." // @Router /region/id/{id} [get] func (h *regionHandler) GetRegionByID(w http.ResponseWriter, r *http.Request) { @@ -257,7 +67,7 @@ func (h *regionHandler) GetRegionByID(w http.ResponseWriter, r *http.Request) { } regionInfo := rc.GetRegion(regionID) - b, err := marshalRegionInfoJSON(r.Context(), regionInfo) + b, err := response.MarshalRegionInfoJSON(r.Context(), regionInfo) if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return @@ -270,7 +80,7 @@ func (h *regionHandler) GetRegionByID(w http.ResponseWriter, r *http.Request) { // @Summary Search for a region by a key. GetRegion is named to be consistent with gRPC // @Param key path string true "Region key" // @Produce json -// @Success 200 {object} RegionInfo +// @Success 200 {object} response.RegionInfo // @Router /region/key/{key} [get] func (h *regionHandler) GetRegion(w http.ResponseWriter, r *http.Request) { rc := getCluster(r) @@ -282,7 +92,7 @@ func (h *regionHandler) GetRegion(w http.ResponseWriter, r *http.Request) { return } regionInfo := rc.GetRegionByKey([]byte(key)) - b, err := marshalRegionInfoJSON(r.Context(), regionInfo) + b, err := response.MarshalRegionInfoJSON(r.Context(), regionInfo) if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return @@ -325,81 +135,15 @@ func newRegionsHandler(svr *server.Server, rd *render.Render) *regionsHandler { } } -// marshalRegionInfoJSON marshals region to bytes in `RegionInfo`'s JSON format. -// It is used to reduce the cost of JSON serialization. -func marshalRegionInfoJSON(ctx context.Context, r *core.RegionInfo) ([]byte, error) { - out := &jwriter.Writer{} - - region := &RegionInfo{} - select { - case <-ctx.Done(): - // Return early, avoid the unnecessary computation. - // See more details in https://github.com/tikv/pd/issues/6835 - return nil, ctx.Err() - default: - } - - covertAPIRegionInfo(r, region, out) - return out.Buffer.BuildBytes(), out.Error -} - -// marshalRegionsInfoJSON marshals regions to bytes in `RegionsInfo`'s JSON format. -// It is used to reduce the cost of JSON serialization. -func marshalRegionsInfoJSON(ctx context.Context, regions []*core.RegionInfo) ([]byte, error) { - out := &jwriter.Writer{} - out.RawByte('{') - - out.RawString("\"count\":") - out.Int(len(regions)) - - out.RawString(",\"regions\":") - out.RawByte('[') - region := &RegionInfo{} - for i, r := range regions { - select { - case <-ctx.Done(): - // Return early, avoid the unnecessary computation. - // See more details in https://github.com/tikv/pd/issues/6835 - return nil, ctx.Err() - default: - } - if i > 0 { - out.RawByte(',') - } - covertAPIRegionInfo(r, region, out) - } - out.RawByte(']') - - out.RawByte('}') - return out.Buffer.BuildBytes(), out.Error -} - -func covertAPIRegionInfo(r *core.RegionInfo, region *RegionInfo, out *jwriter.Writer) { - InitRegion(r, region) - // EasyJSON will not check anonymous struct pointer field and will panic if the field is nil. - // So we need to set the field to default value explicitly when the anonymous struct pointer is nil. - region.Leader.setDefaultIfNil() - for i := range region.Peers { - region.Peers[i].setDefaultIfNil() - } - for i := range region.PendingPeers { - region.PendingPeers[i].setDefaultIfNil() - } - for i := range region.DownPeers { - region.DownPeers[i].setDefaultIfNil() - } - region.MarshalEasyJSON(out) -} - // @Tags region // @Summary List all regions in the cluster. // @Produce json -// @Success 200 {object} RegionsInfo +// @Success 200 {object} response.RegionsInfo // @Router /regions [get] func (h *regionsHandler) GetRegions(w http.ResponseWriter, r *http.Request) { rc := getCluster(r) regions := rc.GetRegions() - b, err := marshalRegionsInfoJSON(r.Context(), regions) + b, err := response.MarshalRegionsInfoJSON(r.Context(), regions) if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return @@ -413,7 +157,7 @@ func (h *regionsHandler) GetRegions(w http.ResponseWriter, r *http.Request) { // @Param endkey query string true "Region range end key" // @Param limit query integer false "Limit count" default(16) // @Produce json -// @Success 200 {object} RegionsInfo +// @Success 200 {object} response.RegionsInfo // @Failure 400 {string} string "The input is invalid." // @Router /regions/key [get] func (h *regionsHandler) ScanRegions(w http.ResponseWriter, r *http.Request) { @@ -427,7 +171,7 @@ func (h *regionsHandler) ScanRegions(w http.ResponseWriter, r *http.Request) { } regions := rc.ScanRegions([]byte(startKey), []byte(endKey), limit) - b, err := marshalRegionsInfoJSON(r.Context(), regions) + b, err := response.MarshalRegionsInfoJSON(r.Context(), regions) if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return @@ -438,19 +182,19 @@ func (h *regionsHandler) ScanRegions(w http.ResponseWriter, r *http.Request) { // @Tags region // @Summary Get count of regions. // @Produce json -// @Success 200 {object} RegionsInfo +// @Success 200 {object} response.RegionsInfo // @Router /regions/count [get] func (h *regionsHandler) GetRegionCount(w http.ResponseWriter, r *http.Request) { rc := getCluster(r) count := rc.GetTotalRegionCount() - h.rd.JSON(w, http.StatusOK, &RegionsInfo{Count: count}) + h.rd.JSON(w, http.StatusOK, &response.RegionsInfo{Count: count}) } // @Tags region // @Summary List all regions of a specific store. // @Param id path integer true "Store Id" // @Produce json -// @Success 200 {object} RegionsInfo +// @Success 200 {object} response.RegionsInfo // @Failure 400 {string} string "The input is invalid." // @Router /regions/store/{id} [get] func (h *regionsHandler) GetStoreRegions(w http.ResponseWriter, r *http.Request) { @@ -463,7 +207,7 @@ func (h *regionsHandler) GetStoreRegions(w http.ResponseWriter, r *http.Request) return } regions := rc.GetStoreRegions(uint64(id)) - b, err := marshalRegionsInfoJSON(r.Context(), regions) + b, err := response.MarshalRegionsInfoJSON(r.Context(), regions) if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return @@ -471,14 +215,14 @@ func (h *regionsHandler) GetStoreRegions(w http.ResponseWriter, r *http.Request) h.rd.Data(w, http.StatusOK, b) } -// @Tags region -// @Summary List regions belongs to the given keyspace ID. -// @Param keyspace_id query string true "Keyspace ID" -// @Param limit query integer false "Limit count" default(16) -// @Produce json -// @Success 200 {object} RegionsInfo -// @Failure 400 {string} string "The input is invalid." -// @Router /regions/keyspace/id/{id} [get] +// @Tags region +// @Summary List regions belongs to the given keyspace ID. +// @Param keyspace_id query string true "Keyspace ID" +// @Param limit query integer false "Limit count" default(16) +// @Produce json +// @Success 200 {object} response.RegionsInfo +// @Failure 400 {string} string "The input is invalid." +// @Router /regions/keyspace/id/{id} [get] func (h *regionsHandler) GetKeyspaceRegions(w http.ResponseWriter, r *http.Request) { rc := getCluster(r) vars := mux.Vars(r) @@ -511,7 +255,7 @@ func (h *regionsHandler) GetKeyspaceRegions(w http.ResponseWriter, r *http.Reque txnRegion := rc.ScanRegions(regionBound.TxnLeftBound, regionBound.TxnRightBound, limit-len(regions)) regions = append(regions, txnRegion...) } - b, err := marshalRegionsInfoJSON(r.Context(), regions) + b, err := response.MarshalRegionsInfoJSON(r.Context(), regions) if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return @@ -522,7 +266,7 @@ func (h *regionsHandler) GetKeyspaceRegions(w http.ResponseWriter, r *http.Reque // @Tags region // @Summary List all regions that miss peer. // @Produce json -// @Success 200 {object} RegionsInfo +// @Success 200 {object} response.RegionsInfo // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /regions/check/miss-peer [get] func (h *regionsHandler) GetMissPeerRegions(w http.ResponseWriter, r *http.Request) { @@ -540,7 +284,7 @@ func (h *regionsHandler) getRegionsByType( h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } - b, err := marshalRegionsInfoJSON(r.Context(), regions) + b, err := response.MarshalRegionsInfoJSON(r.Context(), regions) if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return @@ -551,7 +295,7 @@ func (h *regionsHandler) getRegionsByType( // @Tags region // @Summary List all regions that has extra peer. // @Produce json -// @Success 200 {object} RegionsInfo +// @Success 200 {object} response.RegionsInfo // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /regions/check/extra-peer [get] func (h *regionsHandler) GetExtraPeerRegions(w http.ResponseWriter, r *http.Request) { @@ -561,7 +305,7 @@ func (h *regionsHandler) GetExtraPeerRegions(w http.ResponseWriter, r *http.Requ // @Tags region // @Summary List all regions that has pending peer. // @Produce json -// @Success 200 {object} RegionsInfo +// @Success 200 {object} response.RegionsInfo // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /regions/check/pending-peer [get] func (h *regionsHandler) GetPendingPeerRegions(w http.ResponseWriter, r *http.Request) { @@ -571,7 +315,7 @@ func (h *regionsHandler) GetPendingPeerRegions(w http.ResponseWriter, r *http.Re // @Tags region // @Summary List all regions that has down peer. // @Produce json -// @Success 200 {object} RegionsInfo +// @Success 200 {object} response.RegionsInfo // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /regions/check/down-peer [get] func (h *regionsHandler) GetDownPeerRegions(w http.ResponseWriter, r *http.Request) { @@ -581,7 +325,7 @@ func (h *regionsHandler) GetDownPeerRegions(w http.ResponseWriter, r *http.Reque // @Tags region // @Summary List all regions that has learner peer. // @Produce json -// @Success 200 {object} RegionsInfo +// @Success 200 {object} response.RegionsInfo // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /regions/check/learner-peer [get] func (h *regionsHandler) GetLearnerPeerRegions(w http.ResponseWriter, r *http.Request) { @@ -591,7 +335,7 @@ func (h *regionsHandler) GetLearnerPeerRegions(w http.ResponseWriter, r *http.Re // @Tags region // @Summary List all regions that has offline peer. // @Produce json -// @Success 200 {object} RegionsInfo +// @Success 200 {object} response.RegionsInfo // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /regions/check/offline-peer [get] func (h *regionsHandler) GetOfflinePeerRegions(w http.ResponseWriter, r *http.Request) { @@ -601,7 +345,7 @@ func (h *regionsHandler) GetOfflinePeerRegions(w http.ResponseWriter, r *http.Re // @Tags region // @Summary List all regions that are oversized. // @Produce json -// @Success 200 {object} RegionsInfo +// @Success 200 {object} response.RegionsInfo // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /regions/check/oversized-region [get] func (h *regionsHandler) GetOverSizedRegions(w http.ResponseWriter, r *http.Request) { @@ -611,7 +355,7 @@ func (h *regionsHandler) GetOverSizedRegions(w http.ResponseWriter, r *http.Requ // @Tags region // @Summary List all regions that are undersized. // @Produce json -// @Success 200 {object} RegionsInfo +// @Success 200 {object} response.RegionsInfo // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /regions/check/undersized-region [get] func (h *regionsHandler) GetUndersizedRegions(w http.ResponseWriter, r *http.Request) { @@ -621,7 +365,7 @@ func (h *regionsHandler) GetUndersizedRegions(w http.ResponseWriter, r *http.Req // @Tags region // @Summary List all empty regions. // @Produce json -// @Success 200 {object} RegionsInfo +// @Success 200 {object} response.RegionsInfo // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /regions/check/empty-region [get] func (h *regionsHandler) GetEmptyRegions(w http.ResponseWriter, r *http.Request) { @@ -745,7 +489,7 @@ func (h *regionsHandler) GetRangeHoles(w http.ResponseWriter, r *http.Request) { // @Summary List sibling regions of a specific region. // @Param id path integer true "Region Id" // @Produce json -// @Success 200 {object} RegionsInfo +// @Success 200 {object} response.RegionsInfo // @Failure 400 {string} string "The input is invalid." // @Failure 404 {string} string "The region does not exist." // @Router /regions/sibling/{id} [get] @@ -765,7 +509,7 @@ func (h *regionsHandler) GetRegionSiblings(w http.ResponseWriter, r *http.Reques } left, right := rc.GetAdjacentRegions(region) - b, err := marshalRegionsInfoJSON(r.Context(), []*core.RegionInfo{left, right}) + b, err := response.MarshalRegionsInfoJSON(r.Context(), []*core.RegionInfo{left, right}) if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return @@ -782,7 +526,7 @@ const ( // @Summary List regions with the highest write flow. // @Param limit query integer false "Limit count" default(16) // @Produce json -// @Success 200 {object} RegionsInfo +// @Success 200 {object} response.RegionsInfo // @Failure 400 {string} string "The input is invalid." // @Router /regions/writeflow [get] func (h *regionsHandler) GetTopWriteFlowRegions(w http.ResponseWriter, r *http.Request) { @@ -793,7 +537,7 @@ func (h *regionsHandler) GetTopWriteFlowRegions(w http.ResponseWriter, r *http.R // @Summary List regions with the highest read flow. // @Param limit query integer false "Limit count" default(16) // @Produce json -// @Success 200 {object} RegionsInfo +// @Success 200 {object} response.RegionsInfo // @Failure 400 {string} string "The input is invalid." // @Router /regions/readflow [get] func (h *regionsHandler) GetTopReadFlowRegions(w http.ResponseWriter, r *http.Request) { @@ -804,7 +548,7 @@ func (h *regionsHandler) GetTopReadFlowRegions(w http.ResponseWriter, r *http.Re // @Summary List regions with the largest conf version. // @Param limit query integer false "Limit count" default(16) // @Produce json -// @Success 200 {object} RegionsInfo +// @Success 200 {object} response.RegionsInfo // @Failure 400 {string} string "The input is invalid." // @Router /regions/confver [get] func (h *regionsHandler) GetTopConfVerRegions(w http.ResponseWriter, r *http.Request) { @@ -817,7 +561,7 @@ func (h *regionsHandler) GetTopConfVerRegions(w http.ResponseWriter, r *http.Req // @Summary List regions with the largest version. // @Param limit query integer false "Limit count" default(16) // @Produce json -// @Success 200 {object} RegionsInfo +// @Success 200 {object} response.RegionsInfo // @Failure 400 {string} string "The input is invalid." // @Router /regions/version [get] func (h *regionsHandler) GetTopVersionRegions(w http.ResponseWriter, r *http.Request) { @@ -830,7 +574,7 @@ func (h *regionsHandler) GetTopVersionRegions(w http.ResponseWriter, r *http.Req // @Summary List regions with the largest size. // @Param limit query integer false "Limit count" default(16) // @Produce json -// @Success 200 {object} RegionsInfo +// @Success 200 {object} response.RegionsInfo // @Failure 400 {string} string "The input is invalid." // @Router /regions/size [get] func (h *regionsHandler) GetTopSizeRegions(w http.ResponseWriter, r *http.Request) { @@ -843,7 +587,7 @@ func (h *regionsHandler) GetTopSizeRegions(w http.ResponseWriter, r *http.Reques // @Summary List regions with the largest keys. // @Param limit query integer false "Limit count" default(16) // @Produce json -// @Success 200 {object} RegionsInfo +// @Success 200 {object} response.RegionsInfo // @Failure 400 {string} string "The input is invalid." // @Router /regions/keys [get] func (h *regionsHandler) GetTopKeysRegions(w http.ResponseWriter, r *http.Request) { @@ -856,7 +600,7 @@ func (h *regionsHandler) GetTopKeysRegions(w http.ResponseWriter, r *http.Reques // @Summary List regions with the highest CPU usage. // @Param limit query integer false "Limit count" default(16) // @Produce json -// @Success 200 {object} RegionsInfo +// @Success 200 {object} response.RegionsInfo // @Failure 400 {string} string "The input is invalid." // @Router /regions/cpu [get] func (h *regionsHandler) GetTopCPURegions(w http.ResponseWriter, r *http.Request) { @@ -955,7 +699,7 @@ func (h *regionsHandler) GetTopNRegions(w http.ResponseWriter, r *http.Request, return } regions := TopNRegions(rc.GetRegions(), less, limit) - b, err := marshalRegionsInfoJSON(r.Context(), regions) + b, err := response.MarshalRegionsInfoJSON(r.Context(), regions) if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return diff --git a/server/api/region_test.go b/server/api/region_test.go index ffac1571430..cea702cd412 100644 --- a/server/api/region_test.go +++ b/server/api/region_test.go @@ -31,57 +31,12 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/response" "github.com/tikv/pd/pkg/utils/apiutil" tu "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server" ) -func TestPeer(t *testing.T) { - re := require.New(t) - peers := []*metapb.Peer{ - {Id: 1, StoreId: 10, Role: metapb.PeerRole_Voter}, - {Id: 2, StoreId: 20, Role: metapb.PeerRole_Learner}, - {Id: 3, StoreId: 30, Role: metapb.PeerRole_IncomingVoter}, - {Id: 4, StoreId: 40, Role: metapb.PeerRole_DemotingVoter}, - } - // float64 is the default numeric type for JSON - expected := []map[string]interface{}{ - {"id": float64(1), "store_id": float64(10), "role_name": "Voter"}, - {"id": float64(2), "store_id": float64(20), "role": float64(1), "role_name": "Learner", "is_learner": true}, - {"id": float64(3), "store_id": float64(30), "role": float64(2), "role_name": "IncomingVoter"}, - {"id": float64(4), "store_id": float64(40), "role": float64(3), "role_name": "DemotingVoter"}, - } - - data, err := json.Marshal(fromPeerSlice(peers)) - re.NoError(err) - var ret []map[string]interface{} - re.NoError(json.Unmarshal(data, &ret)) - re.Equal(expected, ret) -} - -func TestPeerStats(t *testing.T) { - re := require.New(t) - peers := []*pdpb.PeerStats{ - {Peer: &metapb.Peer{Id: 1, StoreId: 10, Role: metapb.PeerRole_Voter}, DownSeconds: 0}, - {Peer: &metapb.Peer{Id: 2, StoreId: 20, Role: metapb.PeerRole_Learner}, DownSeconds: 1}, - {Peer: &metapb.Peer{Id: 3, StoreId: 30, Role: metapb.PeerRole_IncomingVoter}, DownSeconds: 2}, - {Peer: &metapb.Peer{Id: 4, StoreId: 40, Role: metapb.PeerRole_DemotingVoter}, DownSeconds: 3}, - } - // float64 is the default numeric type for JSON - expected := []map[string]interface{}{ - {"peer": map[string]interface{}{"id": float64(1), "store_id": float64(10), "role_name": "Voter"}}, - {"peer": map[string]interface{}{"id": float64(2), "store_id": float64(20), "role": float64(1), "role_name": "Learner", "is_learner": true}, "down_seconds": float64(1)}, - {"peer": map[string]interface{}{"id": float64(3), "store_id": float64(30), "role": float64(2), "role_name": "IncomingVoter"}, "down_seconds": float64(2)}, - {"peer": map[string]interface{}{"id": float64(4), "store_id": float64(40), "role": float64(3), "role_name": "DemotingVoter"}, "down_seconds": float64(3)}, - } - - data, err := json.Marshal(fromPeerStatsSlice(peers)) - re.NoError(err) - var ret []map[string]interface{} - re.NoError(json.Unmarshal(data, &ret)) - re.Equal(expected, ret) -} - type regionTestSuite struct { suite.Suite svr *server.Server @@ -123,26 +78,26 @@ func (suite *regionTestSuite) TestRegion() { re := suite.Require() mustRegionHeartbeat(re, suite.svr, r) url := fmt.Sprintf("%s/region/id/%d", suite.urlPrefix, r.GetID()) - r1 := &RegionInfo{} + r1 := &response.RegionInfo{} r1m := make(map[string]interface{}) suite.NoError(tu.ReadGetJSON(re, testDialClient, url, r1)) r1.Adjust() - suite.Equal(NewAPIRegionInfo(r), r1) - suite.NoError(tu.ReadGetJSON(re, testDialClient, url, &r1m)) - suite.Equal(float64(r.GetBytesWritten()), r1m["written_bytes"].(float64)) - suite.Equal(float64(r.GetKeysWritten()), r1m["written_keys"].(float64)) - suite.Equal(float64(r.GetBytesRead()), r1m["read_bytes"].(float64)) - suite.Equal(float64(r.GetKeysRead()), r1m["read_keys"].(float64)) + re.Equal(response.NewAPIRegionInfo(r), r1) + re.NoError(tu.ReadGetJSON(re, testDialClient, url, &r1m)) + re.Equal(float64(r.GetBytesWritten()), r1m["written_bytes"].(float64)) + re.Equal(float64(r.GetKeysWritten()), r1m["written_keys"].(float64)) + re.Equal(float64(r.GetBytesRead()), r1m["read_bytes"].(float64)) + re.Equal(float64(r.GetKeysRead()), r1m["read_keys"].(float64)) keys := r1m["buckets"].([]interface{}) suite.Len(keys, 2) suite.Equal(core.HexRegionKeyStr([]byte("a")), keys[0].(string)) suite.Equal(core.HexRegionKeyStr([]byte("b")), keys[1].(string)) url = fmt.Sprintf("%s/region/key/%s", suite.urlPrefix, "a") - r2 := &RegionInfo{} - suite.NoError(tu.ReadGetJSON(re, testDialClient, url, r2)) + r2 := &response.RegionInfo{} + re.NoError(tu.ReadGetJSON(re, testDialClient, url, r2)) r2.Adjust() - suite.Equal(NewAPIRegionInfo(r), r2) + re.Equal(response.NewAPIRegionInfo(r), r2) } func (suite *regionTestSuite) TestRegionCheck() { @@ -157,36 +112,36 @@ func (suite *regionTestSuite) TestRegionCheck() { re := suite.Require() mustRegionHeartbeat(re, suite.svr, r) url := fmt.Sprintf("%s/region/id/%d", suite.urlPrefix, r.GetID()) - r1 := &RegionInfo{} - suite.NoError(tu.ReadGetJSON(re, testDialClient, url, r1)) + r1 := &response.RegionInfo{} + re.NoError(tu.ReadGetJSON(re, testDialClient, url, r1)) r1.Adjust() - suite.Equal(NewAPIRegionInfo(r), r1) + re.Equal(response.NewAPIRegionInfo(r), r1) url = fmt.Sprintf("%s/regions/check/%s", suite.urlPrefix, "down-peer") - r2 := &RegionsInfo{} - suite.NoError(tu.ReadGetJSON(re, testDialClient, url, r2)) + r2 := &response.RegionsInfo{} + re.NoError(tu.ReadGetJSON(re, testDialClient, url, r2)) r2.Adjust() - suite.Equal(&RegionsInfo{Count: 1, Regions: []RegionInfo{*NewAPIRegionInfo(r)}}, r2) + re.Equal(&response.RegionsInfo{Count: 1, Regions: []response.RegionInfo{*response.NewAPIRegionInfo(r)}}, r2) url = fmt.Sprintf("%s/regions/check/%s", suite.urlPrefix, "pending-peer") - r3 := &RegionsInfo{} - suite.NoError(tu.ReadGetJSON(re, testDialClient, url, r3)) + r3 := &response.RegionsInfo{} + re.NoError(tu.ReadGetJSON(re, testDialClient, url, r3)) r3.Adjust() - suite.Equal(&RegionsInfo{Count: 1, Regions: []RegionInfo{*NewAPIRegionInfo(r)}}, r3) + re.Equal(&response.RegionsInfo{Count: 1, Regions: []response.RegionInfo{*response.NewAPIRegionInfo(r)}}, r3) url = fmt.Sprintf("%s/regions/check/%s", suite.urlPrefix, "offline-peer") - r4 := &RegionsInfo{} - suite.NoError(tu.ReadGetJSON(re, testDialClient, url, r4)) + r4 := &response.RegionsInfo{} + re.NoError(tu.ReadGetJSON(re, testDialClient, url, r4)) r4.Adjust() - suite.Equal(&RegionsInfo{Count: 0, Regions: []RegionInfo{}}, r4) + re.Equal(&response.RegionsInfo{Count: 0, Regions: []response.RegionInfo{}}, r4) r = r.Clone(core.SetApproximateSize(1)) mustRegionHeartbeat(re, suite.svr, r) url = fmt.Sprintf("%s/regions/check/%s", suite.urlPrefix, "empty-region") - r5 := &RegionsInfo{} - suite.NoError(tu.ReadGetJSON(re, testDialClient, url, r5)) + r5 := &response.RegionsInfo{} + re.NoError(tu.ReadGetJSON(re, testDialClient, url, r5)) r5.Adjust() - suite.Equal(&RegionsInfo{Count: 1, Regions: []RegionInfo{*NewAPIRegionInfo(r)}}, r5) + re.Equal(&response.RegionsInfo{Count: 1, Regions: []response.RegionInfo{*response.NewAPIRegionInfo(r)}}, r5) r = r.Clone(core.SetApproximateSize(1)) mustRegionHeartbeat(re, suite.svr, r) @@ -207,31 +162,31 @@ func (suite *regionTestSuite) TestRegionCheck() { mustPutStore(re, suite.svr, 2, metapb.StoreState_Offline, metapb.NodeState_Removing, []*metapb.StoreLabel{}) mustRegionHeartbeat(re, suite.svr, r) url = fmt.Sprintf("%s/regions/check/%s", suite.urlPrefix, "offline-peer") - r8 := &RegionsInfo{} - suite.NoError(tu.ReadGetJSON(re, testDialClient, url, r8)) + r8 := &response.RegionsInfo{} + re.NoError(tu.ReadGetJSON(re, testDialClient, url, r8)) r4.Adjust() suite.Equal(1, r8.Count) suite.Equal(r.GetID(), r8.Regions[0].ID) } func (suite *regionTestSuite) TestRegions() { - r := NewAPIRegionInfo(core.NewRegionInfo(&metapb.Region{Id: 1}, nil)) - suite.Nil(r.Leader.Peer) - suite.Len(r.Leader.RoleName, 0) + re := suite.Require() + r := response.NewAPIRegionInfo(core.NewRegionInfo(&metapb.Region{Id: 1}, nil)) + re.Nil(r.Leader.Peer) + re.Empty(r.Leader.RoleName) rs := []*core.RegionInfo{ core.NewTestRegionInfo(2, 1, []byte("a"), []byte("b"), core.SetApproximateKeys(10), core.SetApproximateSize(10)), core.NewTestRegionInfo(3, 1, []byte("b"), []byte("c"), core.SetApproximateKeys(10), core.SetApproximateSize(10)), core.NewTestRegionInfo(4, 2, []byte("c"), []byte("d"), core.SetApproximateKeys(10), core.SetApproximateSize(10)), } - regions := make([]RegionInfo, 0, len(rs)) - re := suite.Require() + regions := make([]response.RegionInfo, 0, len(rs)) for _, r := range rs { - regions = append(regions, *NewAPIRegionInfo(r)) + regions = append(regions, *response.NewAPIRegionInfo(r)) mustRegionHeartbeat(re, suite.svr, r) } url := fmt.Sprintf("%s/regions", suite.urlPrefix) - regionsInfo := &RegionsInfo{} + regionsInfo := &response.RegionsInfo{} err := tu.ReadGetJSON(re, testDialClient, url, regionsInfo) suite.NoError(err) suite.Len(regions, regionsInfo.Count) @@ -256,7 +211,7 @@ func (suite *regionTestSuite) TestStoreRegions() { regionIDs := []uint64{2, 3} url := fmt.Sprintf("%s/regions/store/%d", suite.urlPrefix, 1) - r4 := &RegionsInfo{} + r4 := &response.RegionsInfo{} err := tu.ReadGetJSON(re, testDialClient, url, r4) suite.NoError(err) suite.Len(regionIDs, r4.Count) @@ -267,7 +222,7 @@ func (suite *regionTestSuite) TestStoreRegions() { regionIDs = []uint64{4} url = fmt.Sprintf("%s/regions/store/%d", suite.urlPrefix, 2) - r5 := &RegionsInfo{} + r5 := &response.RegionsInfo{} err = tu.ReadGetJSON(re, testDialClient, url, r5) suite.NoError(err) suite.Len(regionIDs, r5.Count) @@ -277,7 +232,7 @@ func (suite *regionTestSuite) TestStoreRegions() { regionIDs = []uint64{} url = fmt.Sprintf("%s/regions/store/%d", suite.urlPrefix, 3) - r6 := &RegionsInfo{} + r6 := &response.RegionsInfo{} err = tu.ReadGetJSON(re, testDialClient, url, r6) suite.NoError(err) suite.Len(regionIDs, r6.Count) @@ -328,10 +283,11 @@ func (suite *regionTestSuite) TestTop() { } func (suite *regionTestSuite) checkTopRegions(url string, regionIDs []uint64) { - regions := &RegionsInfo{} - err := tu.ReadGetJSON(suite.Require(), testDialClient, url, regions) - suite.NoError(err) - suite.Len(regionIDs, regions.Count) + regions := &response.RegionsInfo{} + re := suite.Require() + err := tu.ReadGetJSON(re, testDialClient, url, regions) + re.NoError(err) + re.Len(regionIDs, regions.Count) for i, r := range regions.Regions { suite.Equal(regionIDs[i], r.ID) } @@ -423,7 +379,7 @@ func (suite *getRegionTestSuite) TestRegionKey() { r := core.NewTestRegionInfo(99, 1, []byte{0xFF, 0xFF, 0xAA}, []byte{0xFF, 0xFF, 0xCC}, core.SetWrittenBytes(500), core.SetReadBytes(800), core.SetRegionConfVer(3), core.SetRegionVersion(2)) mustRegionHeartbeat(re, suite.svr, r) url := fmt.Sprintf("%s/region/key/%s", suite.urlPrefix, url.QueryEscape(string([]byte{0xFF, 0xFF, 0xBB}))) - RegionInfo := &RegionInfo{} + RegionInfo := &response.RegionInfo{} err := tu.ReadGetJSON(re, testDialClient, url, RegionInfo) suite.NoError(err) suite.Equal(RegionInfo.ID, r.GetID()) @@ -444,7 +400,7 @@ func (suite *getRegionTestSuite) TestScanRegionByKeys() { url := fmt.Sprintf("%s/regions/key?key=%s", suite.urlPrefix, "b") regionIDs := []uint64{3, 4, 5, 99} - regions := &RegionsInfo{} + regions := &response.RegionsInfo{} err := tu.ReadGetJSON(re, testDialClient, url, regions) suite.NoError(err) suite.Len(regionIDs, regions.Count) @@ -453,7 +409,7 @@ func (suite *getRegionTestSuite) TestScanRegionByKeys() { } url = fmt.Sprintf("%s/regions/key?key=%s", suite.urlPrefix, "d") regionIDs = []uint64{4, 5, 99} - regions = &RegionsInfo{} + regions = &response.RegionsInfo{} err = tu.ReadGetJSON(re, testDialClient, url, regions) suite.NoError(err) suite.Len(regionIDs, regions.Count) @@ -462,7 +418,7 @@ func (suite *getRegionTestSuite) TestScanRegionByKeys() { } url = fmt.Sprintf("%s/regions/key?key=%s", suite.urlPrefix, "g") regionIDs = []uint64{5, 99} - regions = &RegionsInfo{} + regions = &response.RegionsInfo{} err = tu.ReadGetJSON(re, testDialClient, url, regions) suite.NoError(err) suite.Len(regionIDs, regions.Count) @@ -471,7 +427,7 @@ func (suite *getRegionTestSuite) TestScanRegionByKeys() { } url = fmt.Sprintf("%s/regions/key?end_key=%s", suite.urlPrefix, "e") regionIDs = []uint64{2, 3, 4} - regions = &RegionsInfo{} + regions = &response.RegionsInfo{} err = tu.ReadGetJSON(re, testDialClient, url, regions) suite.NoError(err) suite.Len(regionIDs, regions.Count) @@ -480,7 +436,7 @@ func (suite *getRegionTestSuite) TestScanRegionByKeys() { } url = fmt.Sprintf("%s/regions/key?key=%s&end_key=%s", suite.urlPrefix, "b", "g") regionIDs = []uint64{3, 4} - regions = &RegionsInfo{} + regions = &response.RegionsInfo{} err = tu.ReadGetJSON(re, testDialClient, url, regions) suite.NoError(err) suite.Len(regionIDs, regions.Count) @@ -489,7 +445,7 @@ func (suite *getRegionTestSuite) TestScanRegionByKeys() { } url = fmt.Sprintf("%s/regions/key?key=%s&end_key=%s", suite.urlPrefix, "b", []byte{0xFF, 0xFF, 0xCC}) regionIDs = []uint64{3, 4, 5, 99} - regions = &RegionsInfo{} + regions = &response.RegionsInfo{} err = tu.ReadGetJSON(re, testDialClient, url, regions) suite.NoError(err) suite.Len(regionIDs, regions.Count) @@ -597,9 +553,9 @@ func TestRegionsInfoMarshal(t *testing.T) { core.SetReadKeys(10), core.SetWrittenKeys(10)), }, } - regionsInfo := &RegionsInfo{} + regionsInfo := &response.RegionsInfo{} for _, regions := range cases { - b, err := marshalRegionsInfoJSON(context.Background(), regions) + b, err := response.MarshalRegionsInfoJSON(context.Background(), regions) re.NoError(err) err = json.Unmarshal(b, regionsInfo) re.NoError(err) @@ -640,7 +596,7 @@ func BenchmarkGetRegions(b *testing.B) { mustRegionHeartbeat(re, svr, r) } resp, _ := apiutil.GetJSON(testDialClient, url, nil) - regions := &RegionsInfo{} + regions := &response.RegionsInfo{} err := json.NewDecoder(resp.Body).Decode(regions) re.NoError(err) re.Equal(regionCount, regions.Count) diff --git a/server/api/store.go b/server/api/store.go index 1fcae24482b..7e60aec8fda 100644 --- a/server/api/store.go +++ b/server/api/store.go @@ -26,139 +26,15 @@ import ( "github.com/pingcap/errcode" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/tikv/pd/pkg/core" - "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/core/storelimit" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/response" sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/utils/apiutil" - "github.com/tikv/pd/pkg/utils/typeutil" "github.com/tikv/pd/server" "github.com/unrolled/render" ) -// MetaStore contains meta information about a store. -type MetaStore struct { - *metapb.Store - StateName string `json:"state_name"` -} - -// SlowTrend contains slow trend information about a store. -type SlowTrend struct { - // CauseValue is the slow trend detecting raw input, it changes by the performance and pressure along time of the store. - // The value itself is not important, what matter is: - // - The comparition result from store to store. - // - The change magnitude along time (represented by CauseRate). - // Currently it's one of store's internal latency (duration of waiting in the task queue of raftstore.store). - CauseValue float64 `json:"cause_value"` - // CauseRate is for mesuring the change magnitude of CauseValue of the store, - // - CauseRate > 0 means the store is become slower currently - // - CauseRate < 0 means the store is become faster currently - // - CauseRate == 0 means the store's performance and pressure does not have significant changes - CauseRate float64 `json:"cause_rate"` - // ResultValue is the current gRPC QPS of the store. - ResultValue float64 `json:"result_value"` - // ResultRate is for mesuring the change magnitude of ResultValue of the store. - ResultRate float64 `json:"result_rate"` -} - -// StoreStatus contains status about a store. -type StoreStatus struct { - Capacity typeutil.ByteSize `json:"capacity"` - Available typeutil.ByteSize `json:"available"` - UsedSize typeutil.ByteSize `json:"used_size"` - LeaderCount int `json:"leader_count"` - LeaderWeight float64 `json:"leader_weight"` - LeaderScore float64 `json:"leader_score"` - LeaderSize int64 `json:"leader_size"` - RegionCount int `json:"region_count"` - RegionWeight float64 `json:"region_weight"` - RegionScore float64 `json:"region_score"` - RegionSize int64 `json:"region_size"` - LearnerCount int `json:"learner_count,omitempty"` - WitnessCount int `json:"witness_count,omitempty"` - SlowScore uint64 `json:"slow_score,omitempty"` - SlowTrend *SlowTrend `json:"slow_trend,omitempty"` - SendingSnapCount uint32 `json:"sending_snap_count,omitempty"` - ReceivingSnapCount uint32 `json:"receiving_snap_count,omitempty"` - IsBusy bool `json:"is_busy,omitempty"` - StartTS *time.Time `json:"start_ts,omitempty"` - LastHeartbeatTS *time.Time `json:"last_heartbeat_ts,omitempty"` - Uptime *typeutil.Duration `json:"uptime,omitempty"` -} - -// StoreInfo contains information about a store. -type StoreInfo struct { - Store *MetaStore `json:"store"` - Status *StoreStatus `json:"status"` -} - -const ( - disconnectedName = "Disconnected" - downStateName = "Down" -) - -func newStoreInfo(opt *sc.ScheduleConfig, store *core.StoreInfo) *StoreInfo { - var slowTrend *SlowTrend - coreSlowTrend := store.GetSlowTrend() - if coreSlowTrend != nil { - slowTrend = &SlowTrend{coreSlowTrend.CauseValue, coreSlowTrend.CauseRate, coreSlowTrend.ResultValue, coreSlowTrend.ResultRate} - } - s := &StoreInfo{ - Store: &MetaStore{ - Store: store.GetMeta(), - StateName: store.GetState().String(), - }, - Status: &StoreStatus{ - Capacity: typeutil.ByteSize(store.GetCapacity()), - Available: typeutil.ByteSize(store.GetAvailable()), - UsedSize: typeutil.ByteSize(store.GetUsedSize()), - LeaderCount: store.GetLeaderCount(), - LeaderWeight: store.GetLeaderWeight(), - LeaderScore: store.LeaderScore(constant.StringToSchedulePolicy(opt.LeaderSchedulePolicy), 0), - LeaderSize: store.GetLeaderSize(), - RegionCount: store.GetRegionCount(), - RegionWeight: store.GetRegionWeight(), - RegionScore: store.RegionScore(opt.RegionScoreFormulaVersion, opt.HighSpaceRatio, opt.LowSpaceRatio, 0), - RegionSize: store.GetRegionSize(), - LearnerCount: store.GetLearnerCount(), - WitnessCount: store.GetWitnessCount(), - SlowScore: store.GetSlowScore(), - SlowTrend: slowTrend, - SendingSnapCount: store.GetSendingSnapCount(), - ReceivingSnapCount: store.GetReceivingSnapCount(), - IsBusy: store.IsBusy(), - }, - } - - if store.GetStoreStats() != nil { - startTS := store.GetStartTime() - s.Status.StartTS = &startTS - } - if lastHeartbeat := store.GetLastHeartbeatTS(); !lastHeartbeat.IsZero() { - s.Status.LastHeartbeatTS = &lastHeartbeat - } - if upTime := store.GetUptime(); upTime > 0 { - duration := typeutil.NewDuration(upTime) - s.Status.Uptime = &duration - } - - if store.GetState() == metapb.StoreState_Up { - if store.DownTime() > opt.MaxStoreDownTime.Duration { - s.Store.StateName = downStateName - } else if store.IsDisconnected() { - s.Store.StateName = disconnectedName - } - } - return s -} - -// StoresInfo records stores' info. -type StoresInfo struct { - Count int `json:"count"` - Stores []*StoreInfo `json:"stores"` -} - type storeHandler struct { handler *server.Handler rd *render.Render @@ -174,8 +50,8 @@ func newStoreHandler(handler *server.Handler, rd *render.Render) *storeHandler { // @Tags store // @Summary Get a store's information. // @Param id path integer true "Store Id" -// @Produce json -// @Success 200 {object} StoreInfo +// @Produce json +// @Success 200 {object} response.StoreInfo // @Failure 400 {string} string "The input is invalid." // @Failure 404 {string} string "The store does not exist." // @Failure 500 {string} string "PD server failed to proceed the request." @@ -195,7 +71,7 @@ func (h *storeHandler) GetStore(w http.ResponseWriter, r *http.Request) { return } - storeInfo := newStoreInfo(h.handler.GetScheduleConfig(), store) + storeInfo := response.BuildStoreInfo(h.handler.GetScheduleConfig(), store) h.rd.JSON(w, http.StatusOK, storeInfo) } @@ -737,14 +613,14 @@ func (h *storesHandler) GetStoresProgress(w http.ResponseWriter, r *http.Request // @Summary Get stores in the cluster. // @Param state query array true "Specify accepted store states." // @Produce json -// @Success 200 {object} StoresInfo +// @Success 200 {object} response.StoresInfo // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /stores [get] func (h *storesHandler) GetStores(w http.ResponseWriter, r *http.Request) { rc := getCluster(r) stores := rc.GetMetaStores() - StoresInfo := &StoresInfo{ - Stores: make([]*StoreInfo, 0, len(stores)), + StoresInfo := &response.StoresInfo{ + Stores: make([]*response.StoreInfo, 0, len(stores)), } urlFilter, err := newStoreStateFilter(r.URL) @@ -762,7 +638,7 @@ func (h *storesHandler) GetStores(w http.ResponseWriter, r *http.Request) { return } - storeInfo := newStoreInfo(h.GetScheduleConfig(), store) + storeInfo := response.BuildStoreInfo(h.GetScheduleConfig(), store) StoresInfo.Stores = append(StoresInfo.Stores, storeInfo) } StoresInfo.Count = len(StoresInfo.Stores) diff --git a/server/api/store_test.go b/server/api/store_test.go index a1a4fbcfaae..215f89c8619 100644 --- a/server/api/store_test.go +++ b/server/api/store_test.go @@ -30,6 +30,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/response" tu "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/typeutil" "github.com/tikv/pd/server" @@ -115,7 +116,7 @@ func (suite *storeTestSuite) TearDownSuite() { suite.cleanup() } -func checkStoresInfo(re *require.Assertions, ss []*StoreInfo, want []*metapb.Store) { +func checkStoresInfo(re *require.Assertions, ss []*response.StoreInfo, want []*metapb.Store) { re.Len(ss, len(want)) mapWant := make(map[uint64]*metapb.Store) for _, s := range want { @@ -134,23 +135,35 @@ func checkStoresInfo(re *require.Assertions, ss []*StoreInfo, want []*metapb.Sto func (suite *storeTestSuite) TestStoresList() { url := fmt.Sprintf("%s/stores", suite.urlPrefix) - info := new(StoresInfo) + info := new(response.StoresInfo) re := suite.Require() err := tu.ReadGetJSON(re, testDialClient, url, info) suite.NoError(err) checkStoresInfo(re, info.Stores, suite.stores[:3]) url = fmt.Sprintf("%s/stores?state=0", suite.urlPrefix) - info = new(StoresInfo) + info = new(response.StoresInfo) err = tu.ReadGetJSON(re, testDialClient, url, info) suite.NoError(err) checkStoresInfo(re, info.Stores, suite.stores[:2]) url = fmt.Sprintf("%s/stores?state=1", suite.urlPrefix) - info = new(StoresInfo) + info = new(response.StoresInfo) err = tu.ReadGetJSON(re, testDialClient, url, info) suite.NoError(err) checkStoresInfo(re, info.Stores, suite.stores[2:3]) + + url = fmt.Sprintf("%s/stores?state=2", suite.urlPrefix) + info = new(response.StoresInfo) + err = tu.ReadGetJSON(re, testDialClient, url, info) + re.NoError(err) + checkStoresInfo(re, info.Stores, suite.stores[3:]) + + url = fmt.Sprintf("%s/stores?state=2&state=1", suite.urlPrefix) + info = new(response.StoresInfo) + err = tu.ReadGetJSON(re, testDialClient, url, info) + re.NoError(err) + checkStoresInfo(re, info.Stores, suite.stores[2:]) } func (suite *storeTestSuite) TestStoreGet() { @@ -166,20 +179,20 @@ func (suite *storeTestSuite) TestStoreGet() { }, }, ) - info := new(StoreInfo) + info := new(response.StoreInfo) err := tu.ReadGetJSON(suite.Require(), testDialClient, url, info) suite.NoError(err) capacity, _ := units.RAMInBytes("1.636TiB") available, _ := units.RAMInBytes("1.555TiB") suite.Equal(capacity, int64(info.Status.Capacity)) suite.Equal(available, int64(info.Status.Available)) - checkStoresInfo(suite.Require(), []*StoreInfo{info}, suite.stores[:1]) + checkStoresInfo(suite.Require(), []*response.StoreInfo{info}, suite.stores[:1]) } func (suite *storeTestSuite) TestStoreLabel() { url := fmt.Sprintf("%s/store/1", suite.urlPrefix) re := suite.Require() - var info StoreInfo + var info response.StoreInfo err := tu.ReadGetJSON(re, testDialClient, url, &info) suite.NoError(err) suite.Empty(info.Store.Labels) @@ -276,7 +289,7 @@ func (suite *storeTestSuite) TestStoreDelete() { } // store 6 origin status:offline url := fmt.Sprintf("%s/store/6", suite.urlPrefix) - store := new(StoreInfo) + store := new(response.StoreInfo) err := tu.ReadGetJSON(re, testDialClient, url, store) suite.NoError(err) suite.False(store.Store.PhysicallyDestroyed) @@ -288,7 +301,7 @@ func (suite *storeTestSuite) TestStoreDelete() { status = suite.requestStatusBody(testDialClient, http.MethodGet, url) suite.Equal(http.StatusOK, status) - store = new(StoreInfo) + store = new(response.StoreInfo) err = tu.ReadGetJSON(re, testDialClient, url, store) suite.NoError(err) suite.Equal(metapb.StoreState_Up, store.Store.State) @@ -317,13 +330,13 @@ func (suite *storeTestSuite) TestStoreSetState() { mustPutStore(re, suite.svr, uint64(id), metapb.StoreState_Up, metapb.NodeState_Serving, nil) } url := fmt.Sprintf("%s/store/1", suite.urlPrefix) - info := StoreInfo{} + info := response.StoreInfo{} err := tu.ReadGetJSON(re, testDialClient, url, &info) suite.NoError(err) suite.Equal(metapb.StoreState_Up, info.Store.State) // Set to Offline. - info = StoreInfo{} + info = response.StoreInfo{} err = tu.CheckPostJSON(testDialClient, url+"/state?state=Offline", nil, tu.StatusOK(re)) suite.NoError(err) err = tu.ReadGetJSON(re, testDialClient, url, &info) @@ -331,14 +344,14 @@ func (suite *storeTestSuite) TestStoreSetState() { suite.Equal(metapb.StoreState_Offline, info.Store.State) // store not found - info = StoreInfo{} + info = response.StoreInfo{} err = tu.CheckPostJSON(testDialClient, suite.urlPrefix+"/store/10086/state?state=Offline", nil, tu.StatusNotOK(re)) suite.NoError(err) // Invalid state. invalidStates := []string{"Foo", "Tombstone"} for _, state := range invalidStates { - info = StoreInfo{} + info = response.StoreInfo{} err = tu.CheckPostJSON(testDialClient, url+"/state?state="+state, nil, tu.StatusNotOK(re)) suite.NoError(err) err := tu.ReadGetJSON(re, testDialClient, url, &info) @@ -347,7 +360,7 @@ func (suite *storeTestSuite) TestStoreSetState() { } // Set back to Up. - info = StoreInfo{} + info = response.StoreInfo{} err = tu.CheckPostJSON(testDialClient, url+"/state?state=Up", nil, tu.StatusOK(re)) suite.NoError(err) err = tu.ReadGetJSON(re, testDialClient, url, &info) @@ -407,16 +420,16 @@ func (suite *storeTestSuite) TestDownState() { core.SetStoreStats(&pdpb.StoreStats{}), core.SetLastHeartbeatTS(time.Now()), ) - storeInfo := newStoreInfo(suite.svr.GetScheduleConfig(), store) + storeInfo := response.BuildStoreInfo(suite.svr.GetScheduleConfig(), store) suite.Equal(metapb.StoreState_Up.String(), storeInfo.Store.StateName) newStore := store.Clone(core.SetLastHeartbeatTS(time.Now().Add(-time.Minute * 2))) - storeInfo = newStoreInfo(suite.svr.GetScheduleConfig(), newStore) - suite.Equal(disconnectedName, storeInfo.Store.StateName) + storeInfo = response.BuildStoreInfo(suite.svr.GetScheduleConfig(), newStore) + suite.Equal(response.DisconnectedName, storeInfo.Store.StateName) newStore = store.Clone(core.SetLastHeartbeatTS(time.Now().Add(-time.Hour * 2))) - storeInfo = newStoreInfo(suite.svr.GetScheduleConfig(), newStore) - suite.Equal(downStateName, storeInfo.Store.StateName) + storeInfo = response.BuildStoreInfo(suite.svr.GetScheduleConfig(), newStore) + suite.Equal(response.DownStateName, storeInfo.Store.StateName) } func (suite *storeTestSuite) TestGetAllLimit() { diff --git a/server/api/trend.go b/server/api/trend.go index d75086d267d..2775cf33393 100644 --- a/server/api/trend.go +++ b/server/api/trend.go @@ -18,6 +18,7 @@ import ( "net/http" "time" + "github.com/tikv/pd/pkg/response" "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/utils/apiutil" @@ -136,7 +137,7 @@ func (h *trendHandler) getTrendStores() ([]trendStore, error) { } trendStores := make([]trendStore, 0, len(stores)) for _, store := range stores { - info := newStoreInfo(h.svr.GetScheduleConfig(), store) + info := response.BuildStoreInfo(h.svr.GetScheduleConfig(), store) s := trendStore{ ID: info.Store.GetId(), Address: info.Store.GetAddress(), diff --git a/server/forward.go b/server/forward.go index 4be03cbc2e9..47cc43e9f34 100644 --- a/server/forward.go +++ b/server/forward.go @@ -89,6 +89,7 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error { forwardStream tsopb.TSO_TsoClient forwardCtx context.Context cancelForward context.CancelFunc + tsoStreamErr error lastForwardedHost string ) defer func() { @@ -96,6 +97,9 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error { if cancelForward != nil { cancelForward() } + if grpcutil.NeedRebuildConnection(tsoStreamErr) { + s.closeDelegateClient(lastForwardedHost) + } }() maxConcurrentTSOProxyStreamings := int32(s.GetMaxConcurrentTSOProxyStreamings()) @@ -131,7 +135,8 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error { forwardedHost, ok := s.GetServicePrimaryAddr(stream.Context(), utils.TSOServiceName) if !ok || len(forwardedHost) == 0 { - return errors.WithStack(ErrNotFoundTSOAddr) + tsoStreamErr = errors.WithStack(ErrNotFoundTSOAddr) + return tsoStreamErr } if forwardStream == nil || lastForwardedHost != forwardedHost { if cancelForward != nil { @@ -140,18 +145,21 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error { clientConn, err := s.getDelegateClient(s.ctx, forwardedHost) if err != nil { - return errors.WithStack(err) + tsoStreamErr = errors.WithStack(err) + return tsoStreamErr } forwardStream, forwardCtx, cancelForward, err = s.createTSOForwardStream(stream.Context(), clientConn) if err != nil { - return errors.WithStack(err) + tsoStreamErr = errors.WithStack(err) + return tsoStreamErr } lastForwardedHost = forwardedHost } tsopbResp, err := s.forwardTSORequestWithDeadLine(forwardCtx, cancelForward, forwardStream, request, tsDeadlineCh) if err != nil { - return errors.WithStack(err) + tsoStreamErr = errors.WithStack(err) + return tsoStreamErr } // The error types defined for tsopb and pdpb are different, so we need to convert them. @@ -363,25 +371,13 @@ func (s *GrpcServer) getDelegateClient(ctx context.Context, forwardedHost string return conn.(*grpc.ClientConn), nil } -func (s *GrpcServer) getForwardedHost(ctx, streamCtx context.Context, serviceName ...string) (forwardedHost string, err error) { - if s.IsAPIServiceMode() { - var ok bool - if len(serviceName) == 0 { - return "", ErrNotFoundService - } - forwardedHost, ok = s.GetServicePrimaryAddr(ctx, serviceName[0]) - if !ok || len(forwardedHost) == 0 { - switch serviceName[0] { - case utils.TSOServiceName: - return "", ErrNotFoundTSOAddr - case utils.SchedulingServiceName: - return "", ErrNotFoundSchedulingAddr - } - } - } else if fh := grpcutil.GetForwardedHost(streamCtx); !s.isLocalRequest(fh) { - forwardedHost = fh +func (s *GrpcServer) closeDelegateClient(forwardedHost string) { + client, ok := s.clientConns.LoadAndDelete(forwardedHost) + if !ok { + return } - return forwardedHost, nil + client.(*grpc.ClientConn).Close() + log.Debug("close delegate client connection", zap.String("forwarded-host", forwardedHost)) } func (s *GrpcServer) isLocalRequest(forwardedHost string) bool { diff --git a/server/grpc_service.go b/server/grpc_service.go index 3d3cdb0384f..7f007c250fc 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -502,9 +502,8 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error { return errors.WithStack(err) } - if forwardedHost, err := s.getForwardedHost(ctx, stream.Context(), utils.TSOServiceName); err != nil { - return err - } else if len(forwardedHost) > 0 { + forwardedHost := grpcutil.GetForwardedHost(stream.Context()) + if !s.isLocalRequest(forwardedHost) { clientConn, err := s.getDelegateClient(s.ctx, forwardedHost) if err != nil { return errors.WithStack(err) @@ -1181,6 +1180,7 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error if cancel != nil { cancel() } + client, err := s.getDelegateClient(s.ctx, forwardedSchedulingHost) if err != nil { errRegionHeartbeatClient.Inc() @@ -1219,6 +1219,9 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error } if err := forwardSchedulingStream.Send(schedulingpbReq); err != nil { forwardSchedulingStream = nil + if grpcutil.NeedRebuildConnection(err) { + s.closeDelegateClient(lastForwardedSchedulingHost) + } errRegionHeartbeatSend.Inc() log.Error("failed to send request to scheduling service", zap.Error(err)) } diff --git a/tests/integrations/mcs/scheduling/api_test.go b/tests/integrations/mcs/scheduling/api_test.go index 742b83f8ba9..82bf8b941a6 100644 --- a/tests/integrations/mcs/scheduling/api_test.go +++ b/tests/integrations/mcs/scheduling/api_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/core" @@ -601,3 +602,120 @@ func TestFollowerForward(t *testing.T) { env := tests.NewSchedulingTestEnvironment(t) env.RunTestInTwoModes(checkFollowerForward) } + +func (suite *apiTestSuite) TestStores() { + env := tests.NewSchedulingTestEnvironment(suite.T()) + env.RunTestInAPIMode(suite.checkStores) +} + +func (suite *apiTestSuite) checkStores(cluster *tests.TestCluster) { + re := suite.Require() + stores := []*metapb.Store{ + { + // metapb.StoreState_Up == 0 + Id: 1, + Address: "tikv1", + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + Version: "2.0.0", + }, + { + Id: 4, + Address: "tikv4", + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + Version: "2.0.0", + }, + { + // metapb.StoreState_Offline == 1 + Id: 6, + Address: "tikv6", + State: metapb.StoreState_Offline, + NodeState: metapb.NodeState_Removing, + Version: "2.0.0", + }, + { + // metapb.StoreState_Tombstone == 2 + Id: 7, + Address: "tikv7", + State: metapb.StoreState_Tombstone, + NodeState: metapb.NodeState_Removed, + Version: "2.0.0", + }, + } + for _, store := range stores { + tests.MustPutStore(re, cluster, store) + } + // Test /stores + apiServerAddr := cluster.GetLeaderServer().GetAddr() + urlPrefix := fmt.Sprintf("%s/pd/api/v1/stores", apiServerAddr) + var resp map[string]interface{} + err := testutil.ReadGetJSON(re, testDialClient, urlPrefix, &resp) + re.NoError(err) + re.Equal(3, int(resp["count"].(float64))) + re.Len(resp["stores"].([]interface{}), 3) + scheServerAddr := cluster.GetSchedulingPrimaryServer().GetAddr() + urlPrefix = fmt.Sprintf("%s/scheduling/api/v1/stores", scheServerAddr) + err = testutil.ReadGetJSON(re, testDialClient, urlPrefix, &resp) + re.NoError(err) + re.Equal(3, int(resp["count"].(float64))) + re.Len(resp["stores"].([]interface{}), 3) + // Test /stores/{id} + urlPrefix = fmt.Sprintf("%s/scheduling/api/v1/stores/1", scheServerAddr) + err = testutil.ReadGetJSON(re, testDialClient, urlPrefix, &resp) + re.NoError(err) + re.Equal("tikv1", resp["store"].(map[string]interface{})["address"]) + re.Equal("Up", resp["store"].(map[string]interface{})["state_name"]) + urlPrefix = fmt.Sprintf("%s/scheduling/api/v1/stores/6", scheServerAddr) + err = testutil.ReadGetJSON(re, testDialClient, urlPrefix, &resp) + re.NoError(err) + re.Equal("tikv6", resp["store"].(map[string]interface{})["address"]) + re.Equal("Offline", resp["store"].(map[string]interface{})["state_name"]) + urlPrefix = fmt.Sprintf("%s/scheduling/api/v1/stores/7", scheServerAddr) + err = testutil.ReadGetJSON(re, testDialClient, urlPrefix, &resp) + re.NoError(err) + re.Equal("tikv7", resp["store"].(map[string]interface{})["address"]) + re.Equal("Tombstone", resp["store"].(map[string]interface{})["state_name"]) + urlPrefix = fmt.Sprintf("%s/scheduling/api/v1/stores/233", scheServerAddr) + testutil.CheckGetJSON(testDialClient, urlPrefix, nil, + testutil.Status(re, http.StatusNotFound), testutil.StringContain(re, "not found")) +} + +func (suite *apiTestSuite) TestRegions() { + env := tests.NewSchedulingTestEnvironment(suite.T()) + env.RunTestInAPIMode(suite.checkRegions) +} + +func (suite *apiTestSuite) checkRegions(cluster *tests.TestCluster) { + re := suite.Require() + tests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b")) + tests.MustPutRegion(re, cluster, 2, 2, []byte("c"), []byte("d")) + tests.MustPutRegion(re, cluster, 3, 1, []byte("e"), []byte("f")) + // Test /regions + apiServerAddr := cluster.GetLeaderServer().GetAddr() + urlPrefix := fmt.Sprintf("%s/pd/api/v1/regions", apiServerAddr) + var resp map[string]interface{} + err := testutil.ReadGetJSON(re, testDialClient, urlPrefix, &resp) + re.NoError(err) + re.Equal(3, int(resp["count"].(float64))) + re.Len(resp["regions"].([]interface{}), 3) + scheServerAddr := cluster.GetSchedulingPrimaryServer().GetAddr() + urlPrefix = fmt.Sprintf("%s/scheduling/api/v1/regions", scheServerAddr) + err = testutil.ReadGetJSON(re, testDialClient, urlPrefix, &resp) + re.NoError(err) + re.Equal(3, int(resp["count"].(float64))) + re.Len(resp["regions"].([]interface{}), 3) + // Test /regions/{id} and /regions/count + urlPrefix = fmt.Sprintf("%s/scheduling/api/v1/regions/1", scheServerAddr) + err = testutil.ReadGetJSON(re, testDialClient, urlPrefix, &resp) + re.NoError(err) + key := fmt.Sprintf("%x", "a") + re.Equal(key, resp["start_key"]) + urlPrefix = fmt.Sprintf("%s/scheduling/api/v1/regions/count", scheServerAddr) + err = testutil.ReadGetJSON(re, testDialClient, urlPrefix, &resp) + re.NoError(err) + re.Equal(3., resp["count"]) + urlPrefix = fmt.Sprintf("%s/scheduling/api/v1/regions/233", scheServerAddr) + testutil.CheckGetJSON(testDialClient, urlPrefix, nil, + testutil.Status(re, http.StatusNotFound), testutil.StringContain(re, "not found")) +} diff --git a/tests/pdctl/helper.go b/tests/pdctl/helper.go index 3912cdfef7c..ab1ff4f63cd 100644 --- a/tests/pdctl/helper.go +++ b/tests/pdctl/helper.go @@ -21,8 +21,8 @@ import ( "github.com/spf13/cobra" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/response" "github.com/tikv/pd/pkg/utils/typeutil" - "github.com/tikv/pd/server/api" ) // ExecuteCommand is used for test purpose. @@ -36,9 +36,9 @@ func ExecuteCommand(root *cobra.Command, args ...string) (output []byte, err err // CheckStoresInfo is used to check the test results. // CheckStoresInfo will not check Store.State because this field has been omitted pd-ctl output -func CheckStoresInfo(re *require.Assertions, stores []*api.StoreInfo, want []*api.StoreInfo) { +func CheckStoresInfo(re *require.Assertions, stores []*response.StoreInfo, want []*response.StoreInfo) { re.Len(stores, len(want)) - mapWant := make(map[uint64]*api.StoreInfo) + mapWant := make(map[uint64]*response.StoreInfo) for _, s := range want { if _, ok := mapWant[s.Store.Id]; !ok { mapWant[s.Store.Id] = s @@ -61,14 +61,14 @@ func CheckStoresInfo(re *require.Assertions, stores []*api.StoreInfo, want []*ap } // CheckRegionInfo is used to check the test results. -func CheckRegionInfo(re *require.Assertions, output *api.RegionInfo, expected *core.RegionInfo) { - region := api.NewAPIRegionInfo(expected) +func CheckRegionInfo(re *require.Assertions, output *response.RegionInfo, expected *core.RegionInfo) { + region := response.NewAPIRegionInfo(expected) output.Adjust() re.Equal(region, output) } // CheckRegionsInfo is used to check the test results. -func CheckRegionsInfo(re *require.Assertions, output *api.RegionsInfo, expected []*core.RegionInfo) { +func CheckRegionsInfo(re *require.Assertions, output *response.RegionsInfo, expected []*core.RegionInfo) { re.Len(expected, output.Count) got := output.Regions sort.Slice(got, func(i, j int) bool { diff --git a/tests/pdctl/label/label_test.go b/tests/pdctl/label/label_test.go index 9c64933a127..be311d5f99d 100644 --- a/tests/pdctl/label/label_test.go +++ b/tests/pdctl/label/label_test.go @@ -23,7 +23,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/stretchr/testify/require" - "github.com/tikv/pd/server/api" + "github.com/tikv/pd/pkg/response" "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/pdctl" @@ -42,9 +42,9 @@ func TestLabel(t *testing.T) { pdAddr := cluster.GetConfig().GetClientURL() cmd := pdctlCmd.GetRootCmd() - stores := []*api.StoreInfo{ + stores := []*response.StoreInfo{ { - Store: &api.MetaStore{ + Store: &response.MetaStore{ Store: &metapb.Store{ Id: 1, State: metapb.StoreState_Up, @@ -60,7 +60,7 @@ func TestLabel(t *testing.T) { }, }, { - Store: &api.MetaStore{ + Store: &response.MetaStore{ Store: &metapb.Store{ Id: 2, State: metapb.StoreState_Up, @@ -76,7 +76,7 @@ func TestLabel(t *testing.T) { }, }, { - Store: &api.MetaStore{ + Store: &response.MetaStore{ Store: &metapb.Store{ Id: 3, State: metapb.StoreState_Up, @@ -128,9 +128,9 @@ func TestLabel(t *testing.T) { args = []string{"-u", pdAddr, "label", "store", "zone", "us-west"} output, err = pdctl.ExecuteCommand(cmd, args...) re.NoError(err) - storesInfo := new(api.StoresInfo) + storesInfo := new(response.StoresInfo) re.NoError(json.Unmarshal(output, &storesInfo)) - sss := []*api.StoreInfo{stores[0], stores[2]} + sss := []*response.StoreInfo{stores[0], stores[2]} pdctl.CheckStoresInfo(re, storesInfo.Stores, sss) // label isolation [label] diff --git a/tests/pdctl/region/region_test.go b/tests/pdctl/region/region_test.go index fe834ac1421..dfba4803ca0 100644 --- a/tests/pdctl/region/region_test.go +++ b/tests/pdctl/region/region_test.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/response" "github.com/tikv/pd/server/api" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/pdctl" @@ -168,7 +169,7 @@ func TestRegion(t *testing.T) { args := append([]string{"-u", pdAddr}, testCase.args...) output, err := pdctl.ExecuteCommand(cmd, args...) re.NoError(err) - regions := &api.RegionsInfo{} + regions := &response.RegionsInfo{} re.NoError(json.Unmarshal(output, regions)) pdctl.CheckRegionsInfo(re, regions, testCase.expect) } @@ -191,7 +192,7 @@ func TestRegion(t *testing.T) { args := append([]string{"-u", pdAddr}, testCase.args...) output, err := pdctl.ExecuteCommand(cmd, args...) re.NoError(err) - region := &api.RegionInfo{} + region := &response.RegionInfo{} re.NoError(json.Unmarshal(output, region)) pdctl.CheckRegionInfo(re, region, testCase.expect) } diff --git a/tests/pdctl/store/store_test.go b/tests/pdctl/store/store_test.go index 13c7350bb6f..9723260bcbe 100644 --- a/tests/pdctl/store/store_test.go +++ b/tests/pdctl/store/store_test.go @@ -24,8 +24,8 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/storelimit" + "github.com/tikv/pd/pkg/response" "github.com/tikv/pd/pkg/statistics/utils" - "github.com/tikv/pd/server/api" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/pdctl" ctl "github.com/tikv/pd/tools/pd-ctl/pdctl" @@ -43,9 +43,9 @@ func TestStore(t *testing.T) { pdAddr := cluster.GetConfig().GetClientURL() cmd := ctl.GetRootCmd() - stores := []*api.StoreInfo{ + stores := []*response.StoreInfo{ { - Store: &api.MetaStore{ + Store: &response.MetaStore{ Store: &metapb.Store{ Id: 1, State: metapb.StoreState_Up, @@ -56,7 +56,7 @@ func TestStore(t *testing.T) { }, }, { - Store: &api.MetaStore{ + Store: &response.MetaStore{ Store: &metapb.Store{ Id: 3, State: metapb.StoreState_Up, @@ -67,7 +67,7 @@ func TestStore(t *testing.T) { }, }, { - Store: &api.MetaStore{ + Store: &response.MetaStore{ Store: &metapb.Store{ Id: 2, State: metapb.StoreState_Tombstone, @@ -91,7 +91,7 @@ func TestStore(t *testing.T) { args := []string{"-u", pdAddr, "store"} output, err := pdctl.ExecuteCommand(cmd, args...) re.NoError(err) - storesInfo := new(api.StoresInfo) + storesInfo := new(response.StoresInfo) re.NoError(json.Unmarshal(output, &storesInfo)) pdctl.CheckStoresInfo(re, storesInfo.Stores, stores[:2]) @@ -101,7 +101,7 @@ func TestStore(t *testing.T) { output, err = pdctl.ExecuteCommand(cmd, args...) re.NoError(err) re.NotContains(string(output), "\"state\":") - storesInfo = new(api.StoresInfo) + storesInfo = new(response.StoresInfo) re.NoError(json.Unmarshal(output, &storesInfo)) pdctl.CheckStoresInfo(re, storesInfo.Stores, stores) @@ -110,10 +110,10 @@ func TestStore(t *testing.T) { args = []string{"-u", pdAddr, "store", "1"} output, err = pdctl.ExecuteCommand(cmd, args...) re.NoError(err) - storeInfo := new(api.StoreInfo) + storeInfo := new(response.StoreInfo) re.NoError(json.Unmarshal(output, &storeInfo)) - pdctl.CheckStoresInfo(re, []*api.StoreInfo{storeInfo}, stores[:1]) + pdctl.CheckStoresInfo(re, []*response.StoreInfo{storeInfo}, stores[:1]) re.Nil(storeInfo.Store.Labels) // store label command @@ -162,7 +162,7 @@ func TestStore(t *testing.T) { args = testcase.newArgs } cmd := ctl.GetRootCmd() - storeInfo := new(api.StoreInfo) + storeInfo := new(response.StoreInfo) _, err = pdctl.ExecuteCommand(cmd, args...) re.NoError(err) args = []string{"-u", pdAddr, "store", "1"} @@ -305,7 +305,7 @@ func TestStore(t *testing.T) { args = []string{"-u", pdAddr, "store", "1"} output, err = pdctl.ExecuteCommand(cmd, args...) re.NoError(err) - storeInfo = new(api.StoreInfo) + storeInfo = new(response.StoreInfo) re.NoError(json.Unmarshal(output, &storeInfo)) storeInfo.Store.State = metapb.StoreState(metapb.StoreState_value[storeInfo.Store.StateName]) @@ -338,7 +338,7 @@ func TestStore(t *testing.T) { args = []string{"-u", pdAddr, "store", "1"} output, err = pdctl.ExecuteCommand(cmd, args...) re.NoError(err) - storeInfo = new(api.StoreInfo) + storeInfo = new(response.StoreInfo) re.NoError(json.Unmarshal(output, &storeInfo)) re.Equal(metapb.StoreState_Up, storeInfo.Store.State) @@ -354,7 +354,7 @@ func TestStore(t *testing.T) { args = []string{"-u", pdAddr, "store", "3"} output, err = pdctl.ExecuteCommand(cmd, args...) re.NoError(err) - storeInfo = new(api.StoreInfo) + storeInfo = new(response.StoreInfo) re.NoError(json.Unmarshal(output, &storeInfo)) storeInfo.Store.State = metapb.StoreState(metapb.StoreState_value[storeInfo.Store.StateName]) @@ -370,7 +370,7 @@ func TestStore(t *testing.T) { args = []string{"-u", pdAddr, "store", "3"} output, err = pdctl.ExecuteCommand(cmd, args...) re.NoError(err) - storeInfo = new(api.StoreInfo) + storeInfo = new(response.StoreInfo) re.NoError(json.Unmarshal(output, &storeInfo)) re.Equal(metapb.StoreState_Up, storeInfo.Store.State) @@ -381,7 +381,7 @@ func TestStore(t *testing.T) { args = []string{"-u", pdAddr, "store", "check", "Tombstone"} output, err = pdctl.ExecuteCommand(cmd, args...) re.NoError(err) - storesInfo = new(api.StoresInfo) + storesInfo = new(response.StoresInfo) re.NoError(json.Unmarshal(output, &storesInfo)) re.Equal(1, storesInfo.Count) @@ -391,7 +391,7 @@ func TestStore(t *testing.T) { args = []string{"-u", pdAddr, "store", "check", "Tombstone"} output, err = pdctl.ExecuteCommand(cmd, args...) re.NoError(err) - storesInfo = new(api.StoresInfo) + storesInfo = new(response.StoresInfo) re.NoError(json.Unmarshal(output, &storesInfo)) re.Equal(0, storesInfo.Count) @@ -470,9 +470,9 @@ func TestTombstoneStore(t *testing.T) { pdAddr := cluster.GetConfig().GetClientURL() cmd := ctl.GetRootCmd() - stores := []*api.StoreInfo{ + stores := []*response.StoreInfo{ { - Store: &api.MetaStore{ + Store: &response.MetaStore{ Store: &metapb.Store{ Id: 2, State: metapb.StoreState_Tombstone, @@ -483,7 +483,7 @@ func TestTombstoneStore(t *testing.T) { }, }, { - Store: &api.MetaStore{ + Store: &response.MetaStore{ Store: &metapb.Store{ Id: 3, State: metapb.StoreState_Tombstone, @@ -494,7 +494,7 @@ func TestTombstoneStore(t *testing.T) { }, }, { - Store: &api.MetaStore{ + Store: &response.MetaStore{ Store: &metapb.Store{ Id: 4, State: metapb.StoreState_Tombstone, diff --git a/tools/pd-ctl/pdctl/command/label_command.go b/tools/pd-ctl/pdctl/command/label_command.go index 60762383241..c0ae3135210 100644 --- a/tools/pd-ctl/pdctl/command/label_command.go +++ b/tools/pd-ctl/pdctl/command/label_command.go @@ -21,9 +21,9 @@ import ( "github.com/spf13/cobra" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/response" sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/statistics" - "github.com/tikv/pd/server/api" ) var ( @@ -114,7 +114,7 @@ func getStores(cmd *cobra.Command, _ []string) ([]*core.StoreInfo, error) { if err != nil { return nil, err } - var storesInfo api.StoresInfo + var storesInfo response.StoresInfo if err := json.Unmarshal([]byte(body), &storesInfo); err != nil { return nil, err } @@ -125,13 +125,13 @@ func getStores(cmd *cobra.Command, _ []string) ([]*core.StoreInfo, error) { return stores, nil } -func getRegions(cmd *cobra.Command, _ []string) ([]api.RegionInfo, error) { +func getRegions(cmd *cobra.Command, _ []string) ([]response.RegionInfo, error) { prefix := regionsPrefix body, err := doRequest(cmd, prefix, http.MethodGet, http.Header{}) if err != nil { return nil, err } - var RegionsInfo api.RegionsInfo + var RegionsInfo response.RegionsInfo if err := json.Unmarshal([]byte(body), &RegionsInfo); err != nil { return nil, err } diff --git a/tools/pd-ctl/pdctl/command/store_command.go b/tools/pd-ctl/pdctl/command/store_command.go index 1dee1c13a72..9579b8f6f52 100644 --- a/tools/pd-ctl/pdctl/command/store_command.go +++ b/tools/pd-ctl/pdctl/command/store_command.go @@ -25,7 +25,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/spf13/cobra" - "github.com/tikv/pd/server/api" + "github.com/tikv/pd/pkg/response" "golang.org/x/text/cases" "golang.org/x/text/language" ) @@ -280,7 +280,7 @@ func storeLimitSceneCommandFunc(cmd *cobra.Command, args []string) { } func convertToStoreInfo(content string) string { - store := &api.StoreInfo{} + store := &response.StoreInfo{} err := json.Unmarshal([]byte(content), store) if err != nil { return content @@ -296,7 +296,7 @@ func convertToStoreInfo(content string) string { } func convertToStoresInfo(content string) string { - stores := &api.StoresInfo{} + stores := &response.StoresInfo{} err := json.Unmarshal([]byte(content), stores) if err != nil { return content