From 19c9852decda4cb49a2319b453c4f01c6a26014f Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 31 May 2024 12:28:22 +0800 Subject: [PATCH] tools: support triggering an event through HTTP API (#5677) close tikv/pd#5451, ref tikv/pd#5468 Signed-off-by: Ryan Leung --- tools/pd-simulator/main.go | 30 +---- .../pd-simulator/simulator/cases/add_nodes.go | 71 ------------ .../simulator/cases/add_nodes_dynamic.go | 92 --------------- .../simulator/cases/balance_leader.go | 9 +- .../simulator/cases/balance_region.go | 7 +- tools/pd-simulator/simulator/cases/cases.go | 16 --- .../simulator/cases/delete_nodes.go | 94 ---------------- .../pd-simulator/simulator/cases/hot_read.go | 7 +- .../pd-simulator/simulator/cases/hot_write.go | 7 +- .../simulator/cases/makeup_down_replica.go | 7 +- .../simulator/cases/region_merge.go | 7 +- tools/pd-simulator/simulator/conn.go | 10 ++ tools/pd-simulator/simulator/drive.go | 59 +++++++--- tools/pd-simulator/simulator/event.go | 106 ++++++++++++------ tools/pd-simulator/simulator/simutil/id.go | 39 +++++++ 15 files changed, 190 insertions(+), 371 deletions(-) delete mode 100644 tools/pd-simulator/simulator/cases/add_nodes.go delete mode 100644 tools/pd-simulator/simulator/cases/add_nodes_dynamic.go delete mode 100644 tools/pd-simulator/simulator/cases/delete_nodes.go create mode 100644 tools/pd-simulator/simulator/simutil/id.go diff --git a/tools/pd-simulator/main.go b/tools/pd-simulator/main.go index 04de914f5f0..45b3ecd75c9 100644 --- a/tools/pd-simulator/main.go +++ b/tools/pd-simulator/main.go @@ -17,8 +17,6 @@ package main import ( "context" "fmt" - "net/http" - "net/http/pprof" "os" "os/signal" "syscall" @@ -26,7 +24,6 @@ import ( "github.com/BurntSushi/toml" "github.com/pingcap/log" - "github.com/prometheus/client_golang/prometheus/promhttp" flag "github.com/spf13/pflag" "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/statistics" @@ -95,8 +92,7 @@ func main() { func run(simCase string, simConfig *sc.SimConfig) { if *pdAddr != "" { - go runHTTPServer() - simStart(*pdAddr, simCase, simConfig) + simStart(*pdAddr, *statusAddress, simCase, simConfig) } else { local, clean := NewSingleServer(context.Background(), simConfig) err := local.Run() @@ -109,28 +105,10 @@ func run(simCase string, simConfig *sc.SimConfig) { } time.Sleep(100 * time.Millisecond) } - simStart(local.GetAddr(), simCase, simConfig, clean) + simStart(local.GetAddr(), "", simCase, simConfig, clean) } } -func runHTTPServer() { - http.Handle("/metrics", promhttp.Handler()) - // profile API - http.HandleFunc("/pprof/profile", pprof.Profile) - http.HandleFunc("/pprof/trace", pprof.Trace) - http.HandleFunc("/pprof/symbol", pprof.Symbol) - http.Handle("/pprof/heap", pprof.Handler("heap")) - http.Handle("/pprof/mutex", pprof.Handler("mutex")) - http.Handle("/pprof/allocs", pprof.Handler("allocs")) - http.Handle("/pprof/block", pprof.Handler("block")) - http.Handle("/pprof/goroutine", pprof.Handler("goroutine")) - server := &http.Server{ - Addr: *statusAddress, - ReadHeaderTimeout: 3 * time.Second, - } - server.ListenAndServe() -} - // NewSingleServer creates a pd server for simulator. func NewSingleServer(ctx context.Context, simConfig *sc.SimConfig) (*server.Server, testutil.CleanupFunc) { err := logutil.SetupLogger(simConfig.ServerConfig.Log, &simConfig.ServerConfig.Logger, &simConfig.ServerConfig.LogProps) @@ -157,9 +135,9 @@ func cleanServer(cfg *config.Config) { os.RemoveAll(cfg.DataDir) } -func simStart(pdAddr string, simCase string, simConfig *sc.SimConfig, clean ...testutil.CleanupFunc) { +func simStart(pdAddr, statusAddress string, simCase string, simConfig *sc.SimConfig, clean ...testutil.CleanupFunc) { start := time.Now() - driver, err := simulator.NewDriver(pdAddr, simCase, simConfig) + driver, err := simulator.NewDriver(pdAddr, statusAddress, simCase, simConfig) if err != nil { simutil.Logger.Fatal("create driver error", zap.Error(err)) } diff --git a/tools/pd-simulator/simulator/cases/add_nodes.go b/tools/pd-simulator/simulator/cases/add_nodes.go deleted file mode 100644 index 5c73fe9764c..00000000000 --- a/tools/pd-simulator/simulator/cases/add_nodes.go +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright 2017 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cases - -import ( - "github.com/docker/go-units" - "github.com/pingcap/kvproto/pkg/metapb" - "github.com/tikv/pd/pkg/core" - sc "github.com/tikv/pd/tools/pd-simulator/simulator/config" - "github.com/tikv/pd/tools/pd-simulator/simulator/info" -) - -func newAddNodes(config *sc.SimConfig) *Case { - var simCase Case - - totalStore := config.TotalStore - totalRegion := config.TotalRegion - replica := int(config.ServerConfig.Replication.MaxReplicas) - noEmptyStoreNum := getNoEmptyStoreNum(totalStore, replica) - - for i := 0; i < totalStore; i++ { - simCase.Stores = append(simCase.Stores, &Store{ - ID: IDAllocator.nextID(), - Status: metapb.StoreState_Up, - }) - } - - for i := 0; i < totalRegion; i++ { - peers := make([]*metapb.Peer, 0, replica) - for j := 0; j < replica; j++ { - peers = append(peers, &metapb.Peer{ - Id: IDAllocator.nextID(), - StoreId: uint64((i+j)%noEmptyStoreNum + 1), - }) - } - simCase.Regions = append(simCase.Regions, Region{ - ID: IDAllocator.nextID(), - Peers: peers, - Leader: peers[0], - Size: 96 * units.MiB, - Keys: 960000, - }) - } - - simCase.Checker = func(regions *core.RegionsInfo, _ []info.StoreStats) bool { - for i := 1; i <= totalStore; i++ { - leaderCount := regions.GetStoreLeaderCount(uint64(i)) - peerCount := regions.GetStoreRegionCount(uint64(i)) - if !isUniform(leaderCount, totalRegion/totalStore) { - return false - } - if !isUniform(peerCount, totalRegion*replica/totalStore) { - return false - } - } - return true - } - return &simCase -} diff --git a/tools/pd-simulator/simulator/cases/add_nodes_dynamic.go b/tools/pd-simulator/simulator/cases/add_nodes_dynamic.go deleted file mode 100644 index aa585b48923..00000000000 --- a/tools/pd-simulator/simulator/cases/add_nodes_dynamic.go +++ /dev/null @@ -1,92 +0,0 @@ -// Copyright 2018 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cases - -import ( - "github.com/docker/go-units" - "github.com/pingcap/kvproto/pkg/metapb" - "github.com/tikv/pd/pkg/core" - sc "github.com/tikv/pd/tools/pd-simulator/simulator/config" - "github.com/tikv/pd/tools/pd-simulator/simulator/info" -) - -func newAddNodesDynamic(config *sc.SimConfig) *Case { - var simCase Case - - totalStore := config.TotalStore - totalRegion := config.TotalRegion - replica := int(config.ServerConfig.Replication.MaxReplicas) - noEmptyStoreNum := getNoEmptyStoreNum(totalStore, replica) - - for i := 0; i < noEmptyStoreNum; i++ { - simCase.Stores = append(simCase.Stores, &Store{ - ID: IDAllocator.nextID(), - Status: metapb.StoreState_Up, - }) - } - - var ids []uint64 - for i := 0; i < totalStore-noEmptyStoreNum; i++ { - ids = append(ids, IDAllocator.nextID()) - } - - for i := 0; i < totalRegion; i++ { - peers := make([]*metapb.Peer, 0, replica) - for j := 0; j < replica; j++ { - peers = append(peers, &metapb.Peer{ - Id: IDAllocator.nextID(), - StoreId: uint64((i+j)%noEmptyStoreNum + 1), - }) - } - simCase.Regions = append(simCase.Regions, Region{ - ID: IDAllocator.nextID(), - Peers: peers, - Leader: peers[0], - Size: 96 * units.MiB, - Keys: 960000, - }) - } - - currentStoreCount := noEmptyStoreNum - e := &AddNodesDescriptor{} - e.Step = func(tick int64) uint64 { - if tick%100 == 0 && currentStoreCount < totalStore { - currentStoreCount++ - nodeID := ids[0] - ids = append(ids[:0], ids[1:]...) - return nodeID - } - return 0 - } - simCase.Events = []EventDescriptor{e} - - simCase.Checker = func(regions *core.RegionsInfo, _ []info.StoreStats) bool { - if currentStoreCount != totalStore { - return false - } - for i := 1; i <= currentStoreCount; i++ { - leaderCount := regions.GetStoreLeaderCount(uint64(i)) - peerCount := regions.GetStoreRegionCount(uint64(i)) - if !isUniform(leaderCount, totalRegion/totalStore) { - return false - } - if !isUniform(peerCount, totalRegion*replica/totalStore) { - return false - } - } - return true - } - return &simCase -} diff --git a/tools/pd-simulator/simulator/cases/balance_leader.go b/tools/pd-simulator/simulator/cases/balance_leader.go index c5315f85d8e..fd9028bc91a 100644 --- a/tools/pd-simulator/simulator/cases/balance_leader.go +++ b/tools/pd-simulator/simulator/cases/balance_leader.go @@ -20,6 +20,7 @@ import ( "github.com/tikv/pd/pkg/core" sc "github.com/tikv/pd/tools/pd-simulator/simulator/config" "github.com/tikv/pd/tools/pd-simulator/simulator/info" + "github.com/tikv/pd/tools/pd-simulator/simulator/simutil" ) func newBalanceLeader(config *sc.SimConfig) *Case { @@ -30,7 +31,7 @@ func newBalanceLeader(config *sc.SimConfig) *Case { replica := int(config.ServerConfig.Replication.MaxReplicas) for i := 0; i < totalStore; i++ { simCase.Stores = append(simCase.Stores, &Store{ - ID: IDAllocator.nextID(), + ID: simutil.IDAllocator.NextID(), Status: metapb.StoreState_Up, }) } @@ -39,17 +40,17 @@ func newBalanceLeader(config *sc.SimConfig) *Case { for i := 0; i < totalRegion; i++ { peers := make([]*metapb.Peer, 0, replica) peers = append(peers, &metapb.Peer{ - Id: IDAllocator.nextID(), + Id: simutil.IDAllocator.NextID(), StoreId: leaderStoreID, }) for j := 1; j < replica; j++ { peers = append(peers, &metapb.Peer{ - Id: IDAllocator.nextID(), + Id: simutil.IDAllocator.NextID(), StoreId: uint64((i+j)%(totalStore-1) + 1), }) } simCase.Regions = append(simCase.Regions, Region{ - ID: IDAllocator.nextID(), + ID: simutil.IDAllocator.NextID(), Peers: peers, Leader: peers[0], Size: 96 * units.MiB, diff --git a/tools/pd-simulator/simulator/cases/balance_region.go b/tools/pd-simulator/simulator/cases/balance_region.go index a559a335c97..82a7ac2d704 100644 --- a/tools/pd-simulator/simulator/cases/balance_region.go +++ b/tools/pd-simulator/simulator/cases/balance_region.go @@ -21,6 +21,7 @@ import ( "github.com/tikv/pd/pkg/core" sc "github.com/tikv/pd/tools/pd-simulator/simulator/config" "github.com/tikv/pd/tools/pd-simulator/simulator/info" + "github.com/tikv/pd/tools/pd-simulator/simulator/simutil" ) func newRedundantBalanceRegion(config *sc.SimConfig) *Case { @@ -32,7 +33,7 @@ func newRedundantBalanceRegion(config *sc.SimConfig) *Case { for i := 0; i < totalStore; i++ { s := &Store{ - ID: IDAllocator.nextID(), + ID: simutil.IDAllocator.NextID(), Status: metapb.StoreState_Up, } if i%2 == 1 { @@ -45,12 +46,12 @@ func newRedundantBalanceRegion(config *sc.SimConfig) *Case { peers := make([]*metapb.Peer, 0, replica) for j := 0; j < replica; j++ { peers = append(peers, &metapb.Peer{ - Id: IDAllocator.nextID(), + Id: simutil.IDAllocator.NextID(), StoreId: uint64((i+j)%totalStore + 1), }) } simCase.Regions = append(simCase.Regions, Region{ - ID: IDAllocator.nextID(), + ID: simutil.IDAllocator.NextID(), Peers: peers, Leader: peers[0], }) diff --git a/tools/pd-simulator/simulator/cases/cases.go b/tools/pd-simulator/simulator/cases/cases.go index f2e79a81924..00b5404669f 100644 --- a/tools/pd-simulator/simulator/cases/cases.go +++ b/tools/pd-simulator/simulator/cases/cases.go @@ -15,8 +15,6 @@ package cases import ( - "math/rand" - "github.com/pingcap/kvproto/pkg/metapb" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/schedule/placement" @@ -91,9 +89,6 @@ var IDAllocator idAllocator var CaseMap = map[string]func(*config.SimConfig) *Case{ "balance-leader": newBalanceLeader, "redundant-balance-region": newRedundantBalanceRegion, - "add-nodes": newAddNodes, - "add-nodes-dynamic": newAddNodesDynamic, - "delete-nodes": newDeleteNodes, "region-split": newRegionSplit, "region-merge": newRegionMerge, "hot-read": newHotRead, @@ -121,14 +116,3 @@ func isUniform(count, meanCount int) bool { minCount := int((1.0 - threshold) * float64(meanCount)) return minCount <= count && count <= maxCount } - -func getNoEmptyStoreNum(storeNum int, replica int) int { - noEmptyStoreNum := rand.Intn(storeNum) - if noEmptyStoreNum < replica { - return replica - } - if noEmptyStoreNum == storeNum { - return storeNum - 1 - } - return noEmptyStoreNum -} diff --git a/tools/pd-simulator/simulator/cases/delete_nodes.go b/tools/pd-simulator/simulator/cases/delete_nodes.go deleted file mode 100644 index 80650cf109d..00000000000 --- a/tools/pd-simulator/simulator/cases/delete_nodes.go +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright 2018 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cases - -import ( - "math/rand" - - "github.com/docker/go-units" - "github.com/pingcap/kvproto/pkg/metapb" - "github.com/tikv/pd/pkg/core" - sc "github.com/tikv/pd/tools/pd-simulator/simulator/config" - "github.com/tikv/pd/tools/pd-simulator/simulator/info" -) - -func newDeleteNodes(config *sc.SimConfig) *Case { - var simCase Case - - totalStore := config.TotalStore - totalRegion := config.TotalRegion - replica := int(config.ServerConfig.Replication.MaxReplicas) - noEmptyStoreNum := totalStore - 1 - for i := 1; i <= totalStore; i++ { - simCase.Stores = append(simCase.Stores, &Store{ - ID: IDAllocator.nextID(), - Status: metapb.StoreState_Up, - }) - } - - for i := 0; i < totalRegion; i++ { - peers := make([]*metapb.Peer, 0, replica) - for j := 0; j < replica; j++ { - peers = append(peers, &metapb.Peer{ - Id: IDAllocator.nextID(), - StoreId: uint64((i+j)%totalStore + 1), - }) - } - simCase.Regions = append(simCase.Regions, Region{ - ID: IDAllocator.nextID(), - Peers: peers, - Leader: peers[0], - Size: 96 * units.MiB, - Keys: 960000, - }) - } - - ids := make([]uint64, 0, len(simCase.Stores)) - for _, store := range simCase.Stores { - ids = append(ids, store.ID) - } - - currentStoreCount := totalStore - e := &DeleteNodesDescriptor{} - e.Step = func(tick int64) uint64 { - if currentStoreCount > noEmptyStoreNum && tick%100 == 0 { - idx := rand.Intn(currentStoreCount) - currentStoreCount-- - nodeID := ids[idx] - ids = append(ids[:idx], ids[idx+1:]...) - return nodeID - } - return 0 - } - simCase.Events = []EventDescriptor{e} - - simCase.Checker = func(regions *core.RegionsInfo, _ []info.StoreStats) bool { - if currentStoreCount != noEmptyStoreNum { - return false - } - for _, i := range ids { - leaderCount := regions.GetStoreLeaderCount(i) - peerCount := regions.GetStoreRegionCount(i) - if !isUniform(leaderCount, totalRegion/noEmptyStoreNum) { - return false - } - if !isUniform(peerCount, totalRegion*replica/noEmptyStoreNum) { - return false - } - } - return true - } - return &simCase -} diff --git a/tools/pd-simulator/simulator/cases/hot_read.go b/tools/pd-simulator/simulator/cases/hot_read.go index 50ad08d6011..d154886b0a4 100644 --- a/tools/pd-simulator/simulator/cases/hot_read.go +++ b/tools/pd-simulator/simulator/cases/hot_read.go @@ -20,6 +20,7 @@ import ( "github.com/tikv/pd/pkg/core" sc "github.com/tikv/pd/tools/pd-simulator/simulator/config" "github.com/tikv/pd/tools/pd-simulator/simulator/info" + "github.com/tikv/pd/tools/pd-simulator/simulator/simutil" ) func newHotRead(config *sc.SimConfig) *Case { @@ -31,7 +32,7 @@ func newHotRead(config *sc.SimConfig) *Case { // Initialize the cluster for i := 0; i < totalStore; i++ { simCase.Stores = append(simCase.Stores, &Store{ - ID: IDAllocator.nextID(), + ID: simutil.IDAllocator.NextID(), Status: metapb.StoreState_Up, }) } @@ -40,12 +41,12 @@ func newHotRead(config *sc.SimConfig) *Case { peers := make([]*metapb.Peer, 0, replica) for j := 0; j < replica; j++ { peers = append(peers, &metapb.Peer{ - Id: IDAllocator.nextID(), + Id: simutil.IDAllocator.NextID(), StoreId: uint64((i+j)%totalStore + 1), }) } simCase.Regions = append(simCase.Regions, Region{ - ID: IDAllocator.nextID(), + ID: simutil.IDAllocator.NextID(), Peers: peers, Leader: peers[0], Size: 96 * units.MiB, diff --git a/tools/pd-simulator/simulator/cases/hot_write.go b/tools/pd-simulator/simulator/cases/hot_write.go index a30afd1a8ec..e73ca6f3ce3 100644 --- a/tools/pd-simulator/simulator/cases/hot_write.go +++ b/tools/pd-simulator/simulator/cases/hot_write.go @@ -20,6 +20,7 @@ import ( "github.com/tikv/pd/pkg/core" sc "github.com/tikv/pd/tools/pd-simulator/simulator/config" "github.com/tikv/pd/tools/pd-simulator/simulator/info" + "github.com/tikv/pd/tools/pd-simulator/simulator/simutil" ) func newHotWrite(config *sc.SimConfig) *Case { @@ -31,7 +32,7 @@ func newHotWrite(config *sc.SimConfig) *Case { // Initialize the cluster for i := 0; i < totalStore; i++ { simCase.Stores = append(simCase.Stores, &Store{ - ID: IDAllocator.nextID(), + ID: simutil.IDAllocator.NextID(), Status: metapb.StoreState_Up, }) } @@ -40,12 +41,12 @@ func newHotWrite(config *sc.SimConfig) *Case { peers := make([]*metapb.Peer, 0, replica) for j := 0; j < replica; j++ { peers = append(peers, &metapb.Peer{ - Id: IDAllocator.nextID(), + Id: simutil.IDAllocator.NextID(), StoreId: uint64((i+j)%totalStore + 1), }) } simCase.Regions = append(simCase.Regions, Region{ - ID: IDAllocator.nextID(), + ID: simutil.IDAllocator.NextID(), Peers: peers, Leader: peers[0], Size: 96 * units.MiB, diff --git a/tools/pd-simulator/simulator/cases/makeup_down_replica.go b/tools/pd-simulator/simulator/cases/makeup_down_replica.go index 28de9577cfc..a5ee63e71a0 100644 --- a/tools/pd-simulator/simulator/cases/makeup_down_replica.go +++ b/tools/pd-simulator/simulator/cases/makeup_down_replica.go @@ -20,6 +20,7 @@ import ( "github.com/tikv/pd/pkg/core" sc "github.com/tikv/pd/tools/pd-simulator/simulator/config" "github.com/tikv/pd/tools/pd-simulator/simulator/info" + "github.com/tikv/pd/tools/pd-simulator/simulator/simutil" ) func newMakeupDownReplicas(config *sc.SimConfig) *Case { @@ -31,7 +32,7 @@ func newMakeupDownReplicas(config *sc.SimConfig) *Case { noEmptyStoreNum := totalStore - 1 for i := 0; i < totalStore; i++ { simCase.Stores = append(simCase.Stores, &Store{ - ID: IDAllocator.nextID(), + ID: simutil.IDAllocator.NextID(), Status: metapb.StoreState_Up, }) } @@ -40,12 +41,12 @@ func newMakeupDownReplicas(config *sc.SimConfig) *Case { peers := make([]*metapb.Peer, 0, replica) for j := 0; j < replica; j++ { peers = append(peers, &metapb.Peer{ - Id: IDAllocator.nextID(), + Id: simutil.IDAllocator.NextID(), StoreId: uint64((i+j)%totalStore + 1), }) } simCase.Regions = append(simCase.Regions, Region{ - ID: IDAllocator.nextID(), + ID: simutil.IDAllocator.NextID(), Peers: peers, Leader: peers[0], Size: 96 * units.MiB, diff --git a/tools/pd-simulator/simulator/cases/region_merge.go b/tools/pd-simulator/simulator/cases/region_merge.go index 953b0e309e1..8097565d1a7 100644 --- a/tools/pd-simulator/simulator/cases/region_merge.go +++ b/tools/pd-simulator/simulator/cases/region_merge.go @@ -20,6 +20,7 @@ import ( "github.com/tikv/pd/pkg/core" sc "github.com/tikv/pd/tools/pd-simulator/simulator/config" "github.com/tikv/pd/tools/pd-simulator/simulator/info" + "github.com/tikv/pd/tools/pd-simulator/simulator/simutil" ) func newRegionMerge(config *sc.SimConfig) *Case { @@ -30,7 +31,7 @@ func newRegionMerge(config *sc.SimConfig) *Case { for i := 0; i < totalStore; i++ { simCase.Stores = append(simCase.Stores, &Store{ - ID: IDAllocator.nextID(), + ID: simutil.IDAllocator.NextID(), Status: metapb.StoreState_Up, }) } @@ -39,12 +40,12 @@ func newRegionMerge(config *sc.SimConfig) *Case { peers := make([]*metapb.Peer, 0, replica) for j := 0; j < replica; j++ { peers = append(peers, &metapb.Peer{ - Id: IDAllocator.nextID(), + Id: simutil.IDAllocator.NextID(), StoreId: uint64((i+j)%totalStore + 1), }) } simCase.Regions = append(simCase.Regions, Region{ - ID: IDAllocator.nextID(), + ID: simutil.IDAllocator.NextID(), Peers: peers, Leader: peers[0], Size: 10 * units.MiB, diff --git a/tools/pd-simulator/simulator/conn.go b/tools/pd-simulator/simulator/conn.go index b95b33ee63d..4be8a2b76dc 100644 --- a/tools/pd-simulator/simulator/conn.go +++ b/tools/pd-simulator/simulator/conn.go @@ -52,3 +52,13 @@ func (c *Connection) nodeHealth(storeID uint64) bool { return n.GetNodeState() == metapb.NodeState_Preparing || n.GetNodeState() == metapb.NodeState_Serving } + +func (c *Connection) getNodes() []*Node { + var nodes []*Node + for _, n := range c.Nodes { + if n.GetNodeState() != metapb.NodeState_Removed { + nodes = append(nodes, n) + } + } + return nodes +} diff --git a/tools/pd-simulator/simulator/drive.go b/tools/pd-simulator/simulator/drive.go index 3d2bce74675..700dd58f87a 100644 --- a/tools/pd-simulator/simulator/drive.go +++ b/tools/pd-simulator/simulator/drive.go @@ -16,6 +16,8 @@ package simulator import ( "context" + "net/http" + "net/http/pprof" "path" "strconv" "sync" @@ -23,6 +25,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/utils/typeutil" "github.com/tikv/pd/tools/pd-simulator/simulator/cases" @@ -35,20 +38,21 @@ import ( // Driver promotes the cluster status change. type Driver struct { - wg sync.WaitGroup - pdAddr string - simCase *cases.Case - client Client - tickCount int64 - eventRunner *EventRunner - raftEngine *RaftEngine - conn *Connection - simConfig *config.SimConfig - pdConfig *config.PDConfig + wg sync.WaitGroup + pdAddr string + statusAddress string + simCase *cases.Case + client Client + tickCount int64 + eventRunner *EventRunner + raftEngine *RaftEngine + conn *Connection + simConfig *config.SimConfig + pdConfig *config.PDConfig } // NewDriver returns a driver. -func NewDriver(pdAddr string, caseName string, simConfig *config.SimConfig) (*Driver, error) { +func NewDriver(pdAddr, statusAddress, caseName string, simConfig *config.SimConfig) (*Driver, error) { simCase := cases.NewCase(caseName, simConfig) if simCase == nil { return nil, errors.Errorf("failed to create case %s", caseName) @@ -57,10 +61,11 @@ func NewDriver(pdAddr string, caseName string, simConfig *config.SimConfig) (*Dr pdConfig.PlacementRules = simCase.Rules pdConfig.LocationLabels = simCase.Labels return &Driver{ - pdAddr: pdAddr, - simCase: simCase, - simConfig: simConfig, - pdConfig: pdConfig, + pdAddr: pdAddr, + statusAddress: statusAddress, + simCase: simCase, + simConfig: simConfig, + pdConfig: pdConfig, }, nil } @@ -77,6 +82,9 @@ func (d *Driver) Prepare() error { d.updateNodeAvailable() + if d.statusAddress != "" { + go d.runHTTPServer() + } // Bootstrap. store, region, err := d.GetBootstrapInfo(d.raftEngine) if err != nil { @@ -95,7 +103,7 @@ func (d *Driver) Prepare() error { // Setup alloc id. // TODO: This is a hack way. Once we have reset alloc ID API, we need to replace it. - maxID := cases.IDAllocator.GetID() + maxID := simutil.IDAllocator.GetID() requestTimeout := 10 * time.Second etcdTimeout := 3 * time.Second etcdClient, err := clientv3.New(clientv3.Config{ @@ -123,7 +131,7 @@ func (d *Driver) Prepare() error { return errors.WithStack(err) } if id > maxID { - cases.IDAllocator.ResetID() + simutil.IDAllocator.ResetID() break } } @@ -226,3 +234,20 @@ func (d *Driver) updateNodeAvailable() { } } } + +func (d *Driver) runHTTPServer() { + http.Handle("/metrics", promhttp.Handler()) + // profile API + http.HandleFunc("/pprof/profile", pprof.Profile) + http.HandleFunc("/pprof/trace", pprof.Trace) + http.HandleFunc("/pprof/symbol", pprof.Symbol) + http.Handle("/pprof/heap", pprof.Handler("heap")) + http.Handle("/pprof/mutex", pprof.Handler("mutex")) + http.Handle("/pprof/allocs", pprof.Handler("allocs")) + http.Handle("/pprof/block", pprof.Handler("block")) + http.Handle("/pprof/goroutine", pprof.Handler("goroutine")) + eventHandler := newEventHandler(d.eventRunner) + http.HandleFunc("/event", eventHandler.createEvent) + // nolint + http.ListenAndServe(d.statusAddress, nil) +} diff --git a/tools/pd-simulator/simulator/event.go b/tools/pd-simulator/simulator/event.go index 04ad10a0db8..8be8f89d759 100644 --- a/tools/pd-simulator/simulator/event.go +++ b/tools/pd-simulator/simulator/event.go @@ -15,6 +15,12 @@ package simulator import ( + "context" + "fmt" + "math/rand" + "net/http" + "sync" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/tikv/pd/pkg/core" @@ -30,6 +36,7 @@ type Event interface { // EventRunner includes all events. type EventRunner struct { + sync.RWMutex events []Event raftEngine *RaftEngine } @@ -46,6 +53,33 @@ func NewEventRunner(events []cases.EventDescriptor, raftEngine *RaftEngine) *Eve return er } +type eventHandler struct { + er *EventRunner +} + +func newEventHandler(er *EventRunner) *eventHandler { + return &eventHandler{ + er: er, + } +} + +func (e *eventHandler) createEvent(w http.ResponseWriter, r *http.Request) { + event := r.URL.Query().Get("event") + if len(event) < 1 { + fmt.Fprintf(w, "no given event") + return + } + switch event { + case "add-node": + e.er.addEvent(&AddNode{}) + return + case "down-node": + e.er.addEvent(&DownNode{}) + return + default: + } +} + func parserEvent(e cases.EventDescriptor) Event { switch t := e.(type) { case *cases.WriteFlowOnSpotDescriptor: @@ -54,16 +88,20 @@ func parserEvent(e cases.EventDescriptor) Event { return &WriteFlowOnRegion{descriptor: t} case *cases.ReadFlowOnRegionDescriptor: return &ReadFlowOnRegion{descriptor: t} - case *cases.AddNodesDescriptor: - return &AddNodes{descriptor: t} - case *cases.DeleteNodesDescriptor: - return &DeleteNodes{descriptor: t} } return nil } +func (er *EventRunner) addEvent(e Event) { + er.Lock() + defer er.Unlock() + er.events = append(er.events, e) +} + // Tick ticks the event run func (er *EventRunner) Tick(tickCount int64) { + er.Lock() + defer er.Unlock() var finishedIndex int for i, e := range er.events { isFinished := e.Run(er.raftEngine, tickCount) @@ -126,24 +164,18 @@ func (e *ReadFlowOnRegion) Run(raft *RaftEngine, tickCount int64) bool { return false } -// AddNodes adds nodes. -type AddNodes struct { - descriptor *cases.AddNodesDescriptor -} +// AddNode adds nodes. +type AddNode struct{} // Run implements the event interface. -func (e *AddNodes) Run(raft *RaftEngine, tickCount int64) bool { - id := e.descriptor.Step(tickCount) - if id == 0 { - return false - } - - if _, ok := raft.conn.Nodes[id]; ok { - simutil.Logger.Info("node has already existed", zap.Uint64("node-id", id)) +func (*AddNode) Run(raft *RaftEngine, _ int64) bool { + config := raft.storeConfig + nodes := raft.conn.getNodes() + id, err := nodes[0].client.AllocID(context.TODO()) + if err != nil { + simutil.Logger.Error("alloc node id failed", zap.Error(err)) return false } - - config := raft.storeConfig s := &cases.Store{ ID: id, Status: metapb.StoreState_Up, @@ -152,49 +184,51 @@ func (e *AddNodes) Run(raft *RaftEngine, tickCount int64) bool { } n, err := NewNode(s, raft.conn.pdAddr, config) if err != nil { - simutil.Logger.Error("add node failed", zap.Uint64("node-id", id), zap.Error(err)) + simutil.Logger.Error("create node failed", zap.Error(err)) return false } - raft.conn.Nodes[id] = n + + raft.conn.Nodes[s.ID] = n n.raftEngine = raft err = n.Start() if err != nil { - simutil.Logger.Error("start node failed", zap.Uint64("node-id", id), zap.Error(err)) + delete(raft.conn.Nodes, s.ID) + simutil.Logger.Error("start node failed", zap.Uint64("node-id", s.ID), zap.Error(err)) + return false } - return false + return true } -// DeleteNodes deletes nodes. -type DeleteNodes struct { - descriptor *cases.DeleteNodesDescriptor -} +// DownNode deletes nodes. +type DownNode struct{} // Run implements the event interface. -func (e *DeleteNodes) Run(raft *RaftEngine, tickCount int64) bool { - id := e.descriptor.Step(tickCount) - if id == 0 { +func (*DownNode) Run(raft *RaftEngine, _ int64) bool { + nodes := raft.conn.getNodes() + if len(nodes) == 0 { + simutil.Logger.Error("can not find any node") return false } - - node := raft.conn.Nodes[id] + i := rand.Intn(len(nodes)) + node := nodes[i] if node == nil { - simutil.Logger.Error("node is not existed", zap.Uint64("node-id", id)) + simutil.Logger.Error("node is not existed", zap.Uint64("node-id", node.Id)) return false } - delete(raft.conn.Nodes, id) + delete(raft.conn.Nodes, node.Id) node.Stop() regions := raft.GetRegions() for _, region := range regions { storeIDs := region.GetStoreIDs() - if _, ok := storeIDs[id]; ok { + if _, ok := storeIDs[node.Id]; ok { downPeer := &pdpb.PeerStats{ - Peer: region.GetStorePeer(id), + Peer: region.GetStorePeer(node.Id), DownSeconds: 24 * 60 * 60, } region = region.Clone(core.WithDownPeers(append(region.GetDownPeers(), downPeer))) raft.SetRegion(region) } } - return false + return true } diff --git a/tools/pd-simulator/simulator/simutil/id.go b/tools/pd-simulator/simulator/simutil/id.go new file mode 100644 index 00000000000..8badddff3f1 --- /dev/null +++ b/tools/pd-simulator/simulator/simutil/id.go @@ -0,0 +1,39 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package simutil + +// IDAllocator is used to alloc unique ID. +type idAllocator struct { + id uint64 +} + +// NextID gets the next unique ID. +func (a *idAllocator) NextID() uint64 { + a.id++ + return a.id +} + +// ResetID resets the IDAllocator. +func (a *idAllocator) ResetID() { + a.id = 0 +} + +// GetID gets the current ID. +func (a *idAllocator) GetID() uint64 { + return a.id +} + +// IDAllocator is used to alloc unique ID. +var IDAllocator idAllocator