Skip to content

Commit

Permalink
Merge branch 'master' into add-more-error
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored May 28, 2024
2 parents ee0a429 + b1cbc71 commit 53085ef
Show file tree
Hide file tree
Showing 47 changed files with 606 additions and 859 deletions.
1 change: 1 addition & 0 deletions client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (
membersPrefix = "/pd/api/v1/members"
leaderPrefix = "/pd/api/v1/leader"
transferLeader = "/pd/api/v1/leader/transfer"
health = "/pd/api/v1/health"
// Config
Config = "/pd/api/v1/config"
ClusterVersion = "/pd/api/v1/config/cluster-version"
Expand Down
15 changes: 15 additions & 0 deletions client/http/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type Client interface {
GetStores(context.Context) (*StoresInfo, error)
GetStore(context.Context, uint64) (*StoreInfo, error)
SetStoreLabels(context.Context, int64, map[string]string) error
GetHealthStatus(context.Context) ([]Health, error)
/* Config-related interfaces */
GetConfig(context.Context) (map[string]any, error)
SetConfig(context.Context, map[string]any, ...float64) error
Expand Down Expand Up @@ -337,6 +338,20 @@ func (c *client) SetStoreLabels(ctx context.Context, storeID int64, storeLabels
WithBody(jsonInput))
}

// GetHealthStatus gets the health status of the cluster.
func (c *client) GetHealthStatus(ctx context.Context) ([]Health, error) {
var healths []Health
err := c.request(ctx, newRequestInfo().
WithName(getHealthStatusName).
WithURI(health).
WithMethod(http.MethodGet).
WithResp(&healths))
if err != nil {
return nil, err
}
return healths, nil
}

// GetConfig gets the configurations.
func (c *client) GetConfig(ctx context.Context) (map[string]any, error) {
var config map[string]any
Expand Down
1 change: 1 addition & 0 deletions client/http/request_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
getStoresName = "GetStores"
getStoreName = "GetStore"
setStoreLabelsName = "SetStoreLabels"
getHealthStatusName = "GetHealthStatus"
getConfigName = "GetConfig"
setConfigName = "SetConfig"
getScheduleConfigName = "GetScheduleConfig"
Expand Down
9 changes: 9 additions & 0 deletions client/http/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,3 +661,12 @@ func stringToKeyspaceState(str string) (keyspacepb.KeyspaceState, error) {
return keyspacepb.KeyspaceState(0), fmt.Errorf("invalid KeyspaceState string: %s", str)
}
}

// Health reflects the cluster's health.
// NOTE: This type is moved from `server/api/health.go`, maybe move them to the same place later.
type Health struct {
Name string `json:"name"`
MemberID uint64 `json:"member_id"`
ClientUrls []string `json:"client_urls"`
Health bool `json:"health"`
}
7 changes: 1 addition & 6 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,7 @@ type Cluster interface {
func HandleStatsAsync(c Cluster, region *core.RegionInfo) {
c.GetHotStat().CheckWriteAsync(statistics.NewCheckExpiredItemTask(region))
c.GetHotStat().CheckReadAsync(statistics.NewCheckExpiredItemTask(region))
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
for _, peer := range region.GetPeers() {
peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval)
c.GetHotStat().CheckWriteAsync(statistics.NewCheckPeerTask(peerInfo, region))
}
c.GetHotStat().CheckWriteAsync(statistics.NewCheckWritePeerTask(region))
c.GetCoordinator().GetSchedulersController().CheckTransferWitnessLeader(region)
}

Expand Down
195 changes: 10 additions & 185 deletions pkg/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,218 +14,43 @@

package core

import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/utils/syncutil"
)

// BasicCluster provides basic data member and interface for a tikv cluster.
type BasicCluster struct {
Stores struct {
mu syncutil.RWMutex
*StoresInfo
}

*StoresInfo
*RegionsInfo
}

// NewBasicCluster creates a BasicCluster.
func NewBasicCluster() *BasicCluster {
return &BasicCluster{
Stores: struct {
mu syncutil.RWMutex
*StoresInfo
}{StoresInfo: NewStoresInfo()},

StoresInfo: NewStoresInfo(),
RegionsInfo: NewRegionsInfo(),
}
}

/* Stores read operations */

// GetStores returns all Stores in the cluster.
func (bc *BasicCluster) GetStores() []*StoreInfo {
bc.Stores.mu.RLock()
defer bc.Stores.mu.RUnlock()
return bc.Stores.GetStores()
}

// GetMetaStores gets a complete set of metapb.Store.
func (bc *BasicCluster) GetMetaStores() []*metapb.Store {
bc.Stores.mu.RLock()
defer bc.Stores.mu.RUnlock()
return bc.Stores.GetMetaStores()
}

// GetStore searches for a store by ID.
func (bc *BasicCluster) GetStore(storeID uint64) *StoreInfo {
bc.Stores.mu.RLock()
defer bc.Stores.mu.RUnlock()
return bc.Stores.GetStore(storeID)
}

// GetRegionStores returns all Stores that contains the region's peer.
func (bc *BasicCluster) GetRegionStores(region *RegionInfo) []*StoreInfo {
bc.Stores.mu.RLock()
defer bc.Stores.mu.RUnlock()
var Stores []*StoreInfo
for id := range region.GetStoreIDs() {
if store := bc.Stores.GetStore(id); store != nil {
Stores = append(Stores, store)
}
}
return Stores
}

// GetNonWitnessVoterStores returns all Stores that contains the non-witness's voter peer.
func (bc *BasicCluster) GetNonWitnessVoterStores(region *RegionInfo) []*StoreInfo {
bc.Stores.mu.RLock()
defer bc.Stores.mu.RUnlock()
var Stores []*StoreInfo
for id := range region.GetNonWitnessVoters() {
if store := bc.Stores.GetStore(id); store != nil {
Stores = append(Stores, store)
}
}
return Stores
}

// GetFollowerStores returns all Stores that contains the region's follower peer.
func (bc *BasicCluster) GetFollowerStores(region *RegionInfo) []*StoreInfo {
bc.Stores.mu.RLock()
defer bc.Stores.mu.RUnlock()
var Stores []*StoreInfo
for id := range region.GetFollowers() {
if store := bc.Stores.GetStore(id); store != nil {
Stores = append(Stores, store)
}
}
return Stores
}

// GetLeaderStore returns all Stores that contains the region's leader peer.
func (bc *BasicCluster) GetLeaderStore(region *RegionInfo) *StoreInfo {
bc.Stores.mu.RLock()
defer bc.Stores.mu.RUnlock()
return bc.Stores.GetStore(region.GetLeader().GetStoreId())
}

// GetStoreCount returns the total count of storeInfo.
func (bc *BasicCluster) GetStoreCount() int {
bc.Stores.mu.RLock()
defer bc.Stores.mu.RUnlock()
return bc.Stores.GetStoreCount()
}

/* Stores Write operations */

// PauseLeaderTransfer prevents the store from been selected as source or
// target store of TransferLeader.
func (bc *BasicCluster) PauseLeaderTransfer(storeID uint64) error {
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
return bc.Stores.PauseLeaderTransfer(storeID)
}

// ResumeLeaderTransfer cleans a store's pause state. The store can be selected
// as source or target of TransferLeader again.
func (bc *BasicCluster) ResumeLeaderTransfer(storeID uint64) {
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
bc.Stores.ResumeLeaderTransfer(storeID)
}

// SlowStoreEvicted marks a store as a slow store and prevents transferring
// leader to the store
func (bc *BasicCluster) SlowStoreEvicted(storeID uint64) error {
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
return bc.Stores.SlowStoreEvicted(storeID)
}

// SlowTrendEvicted marks a store as a slow store by trend and prevents transferring
// leader to the store
func (bc *BasicCluster) SlowTrendEvicted(storeID uint64) error {
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
return bc.Stores.SlowTrendEvicted(storeID)
}

// SlowTrendRecovered cleans the evicted by slow trend state of a store.
func (bc *BasicCluster) SlowTrendRecovered(storeID uint64) {
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
bc.Stores.SlowTrendRecovered(storeID)
}

// SlowStoreRecovered cleans the evicted state of a store.
func (bc *BasicCluster) SlowStoreRecovered(storeID uint64) {
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
bc.Stores.SlowStoreRecovered(storeID)
}

// ResetStoreLimit resets the limit for a specific store.
func (bc *BasicCluster) ResetStoreLimit(storeID uint64, limitType storelimit.Type, ratePerSec ...float64) {
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
bc.Stores.ResetStoreLimit(storeID, limitType, ratePerSec...)
}

// UpdateStoreStatus updates the information of the store.
func (bc *BasicCluster) UpdateStoreStatus(storeID uint64) {
leaderCount, regionCount, witnessCount, learnerCount, pendingPeerCount, leaderRegionSize, regionSize := bc.RegionsInfo.GetStoreStats(storeID)
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
bc.Stores.UpdateStoreStatus(storeID, leaderCount, regionCount, witnessCount, learnerCount, pendingPeerCount, leaderRegionSize, regionSize)
}

// PutStore put a store.
func (bc *BasicCluster) PutStore(store *StoreInfo) {
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
bc.Stores.SetStore(store)
}

// ResetStores resets the store cache.
func (bc *BasicCluster) ResetStores() {
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
bc.Stores.StoresInfo = NewStoresInfo()
}

// DeleteStore deletes a store.
func (bc *BasicCluster) DeleteStore(store *StoreInfo) {
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
bc.Stores.DeleteStore(store)
leaderCount, regionCount, witnessCount, learnerCount, pendingPeerCount, leaderRegionSize, regionSize := bc.GetStoreStats(storeID)
bc.StoresInfo.UpdateStoreStatus(storeID, leaderCount, regionCount, witnessCount, learnerCount, pendingPeerCount, leaderRegionSize, regionSize)
}

/* Regions read operations */

// GetLeaderStoreByRegionID returns the leader store of the given region.
func (bc *BasicCluster) GetLeaderStoreByRegionID(regionID uint64) *StoreInfo {
region := bc.RegionsInfo.GetRegion(regionID)
region := bc.GetRegion(regionID)
if region == nil || region.GetLeader() == nil {
return nil
}

bc.Stores.mu.RLock()
defer bc.Stores.mu.RUnlock()
return bc.Stores.GetStore(region.GetLeader().GetStoreId())
return bc.GetStore(region.GetLeader().GetStoreId())
}

func (bc *BasicCluster) getWriteRate(
f func(storeID uint64) (bytesRate, keysRate float64),
) (storeIDs []uint64, bytesRates, keysRates []float64) {
bc.Stores.mu.RLock()
count := len(bc.Stores.stores)
storeIDs = make([]uint64, 0, count)
for _, store := range bc.Stores.stores {
storeIDs = append(storeIDs, store.GetID())
}
bc.Stores.mu.RUnlock()
storeIDs = bc.GetStoreIDs()
count := len(storeIDs)
bytesRates = make([]float64, 0, count)
keysRates = make([]float64, 0, count)
for _, id := range storeIDs {
Expand All @@ -238,12 +63,12 @@ func (bc *BasicCluster) getWriteRate(

// GetStoresLeaderWriteRate get total write rate of each store's leaders.
func (bc *BasicCluster) GetStoresLeaderWriteRate() (storeIDs []uint64, bytesRates, keysRates []float64) {
return bc.getWriteRate(bc.RegionsInfo.GetStoreLeaderWriteRate)
return bc.getWriteRate(bc.GetStoreLeaderWriteRate)
}

// GetStoresWriteRate get total write rate of each store's regions.
func (bc *BasicCluster) GetStoresWriteRate() (storeIDs []uint64, bytesRates, keysRates []float64) {
return bc.getWriteRate(bc.RegionsInfo.GetStoreWriteRate)
return bc.getWriteRate(bc.GetStoreWriteRate)
}

// UpdateAllStoreStatus updates the information of all stores.
Expand Down
31 changes: 0 additions & 31 deletions pkg/core/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,34 +77,3 @@ func CountInJointState(peers ...*metapb.Peer) int {
}
return count
}

// PeerInfo provides peer information
type PeerInfo struct {
*metapb.Peer
loads []float64
interval uint64
}

// NewPeerInfo creates PeerInfo
func NewPeerInfo(meta *metapb.Peer, loads []float64, interval uint64) *PeerInfo {
return &PeerInfo{
Peer: meta,
loads: loads,
interval: interval,
}
}

// GetLoads provides loads
func (p *PeerInfo) GetLoads() []float64 {
return p.loads
}

// GetPeerID provides peer id
func (p *PeerInfo) GetPeerID() uint64 {
return p.GetId()
}

// GetInterval returns reporting interval
func (p *PeerInfo) GetInterval() uint64 {
return p.interval
}
Loading

0 comments on commit 53085ef

Please sign in to comment.