From 04e7c36bb4ea4fa91c498da995285f1290069b2f Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 10 Jan 2019 10:13:16 +0800 Subject: [PATCH] *: handle more errors properly (#1405) * handle errors properly Signed-off-by: rleungx --- client/client.go | 5 +- client/client_test.go | 2 +- server/api/cluster_test.go | 2 +- server/api/config.go | 15 +- server/api/config_test.go | 4 +- server/api/diagnose.go | 10 +- server/api/diagnose_test.go | 2 +- server/api/health_test.go | 2 +- server/api/member_test.go | 2 +- server/api/server_test.go | 2 +- server/api/status_test.go | 2 +- server/api/store_ns_test.go | 2 +- server/api/trend_test.go | 6 +- server/cluster_info.go | 6 +- server/cluster_info_test.go | 26 +- server/cluster_test.go | 51 ++-- server/cluster_worker_test.go | 9 +- server/config_test.go | 12 +- server/coordinator_test.go | 291 ++++++++++--------- server/core/kv.go | 2 +- server/heartbeat_stream_test.go | 30 +- server/id_test.go | 2 +- server/join_test.go | 2 +- server/leader_test.go | 5 +- server/namespace_test.go | 58 ++-- server/option.go | 7 +- server/region_statistics_test.go | 3 +- server/region_syncer/client.go | 4 +- server/region_syncer/server.go | 4 +- server/schedule/basic_cluster.go | 6 +- server/schedulers/balance_test.go | 5 +- server/server.go | 18 +- server/server_test.go | 6 +- server/store_statistics_test.go | 3 +- server/testutil.go | 13 +- table/namespace_classifier_test.go | 4 +- tests/cluster.go | 10 +- tests/cmd/pdctl_test.go | 61 ++-- tests/server/leader_watch_test.go | 6 +- tools/pd-ctl/pdctl/command/global.go | 9 +- tools/pd-ctl/pdctl/command/region_command.go | 5 +- tools/pd-ctl/pdctl/ctl.go | 4 +- tools/pd-simulator/main.go | 4 +- tools/pd-simulator/simulator/config.go | 4 +- 44 files changed, 400 insertions(+), 326 deletions(-) diff --git a/client/client.go b/client/client.go index 7a637c8c0fa..0f7fdd463c9 100644 --- a/client/client.go +++ b/client/client.go @@ -23,7 +23,7 @@ import ( "sync" "time" - "github.com/opentracing/opentracing-go" + opentracing "github.com/opentracing/opentracing-go" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pkg/errors" @@ -363,6 +363,7 @@ func (c *client) tsLoop() { if err != nil { select { case <-loopCtx.Done(): + cancel() return default: } @@ -395,6 +396,7 @@ func (c *client) tsLoop() { select { case c.tsDeadlineCh <- dl: case <-loopCtx.Done(): + cancel() return } opts = extractSpanReference(requests, opts[:0]) @@ -409,6 +411,7 @@ func (c *client) tsLoop() { if err != nil { select { case <-loopCtx.Done(): + cancel() return default: } diff --git a/client/client_test.go b/client/client_test.go index 23b0a1142fd..6bd5dfa9117 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -63,7 +63,7 @@ type testClientSuite struct { func (s *testClientSuite) SetUpSuite(c *C) { var err error - _, s.srv, s.cleanup, err = server.NewTestServer() + _, s.srv, s.cleanup, err = server.NewTestServer(c) c.Assert(err, IsNil) s.grpcPDClient = mustNewGrpcClient(c, s.srv.GetAddr()) diff --git a/server/api/cluster_test.go b/server/api/cluster_test.go index ebca347aa84..7f5d71d0d81 100644 --- a/server/api/cluster_test.go +++ b/server/api/cluster_test.go @@ -50,7 +50,7 @@ func (s *testClusterInfo) TestCluster(c *C) { c2 := &metapb.Cluster{} r := server.ReplicationConfig{MaxReplicas: 6} - s.svr.SetReplicationConfig(r) + c.Assert(s.svr.SetReplicationConfig(r), IsNil) err = readJSONWithURL(url, c2) c.Assert(err, IsNil) diff --git a/server/api/config.go b/server/api/config.go index 4621a69b1d5..e42e93c4d67 100644 --- a/server/api/config.go +++ b/server/api/config.go @@ -104,7 +104,10 @@ func (h *confHandler) SetReplication(w http.ResponseWriter, r *http.Request) { return } - h.svr.SetReplicationConfig(*config) + if err := h.svr.SetReplicationConfig(*config); err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } h.rd.JSON(w, http.StatusOK, nil) } @@ -136,7 +139,10 @@ func (h *confHandler) SetNamespace(w http.ResponseWriter, r *http.Request) { return } - h.svr.SetNamespaceConfig(name, *config) + if err := h.svr.SetNamespaceConfig(name, *config); err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } h.rd.JSON(w, http.StatusOK, nil) } @@ -148,8 +154,11 @@ func (h *confHandler) DeleteNamespace(w http.ResponseWriter, r *http.Request) { h.rd.JSON(w, http.StatusNotFound, fmt.Sprintf("invalid namespace Name %s, not found", name)) return } - h.svr.DeleteNamespaceConfig(name) + if err := h.svr.DeleteNamespaceConfig(name); err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } h.rd.JSON(w, http.StatusOK, nil) } diff --git a/server/api/config_test.go b/server/api/config_test.go index 02bc9876271..024f378204c 100644 --- a/server/api/config_test.go +++ b/server/api/config_test.go @@ -76,7 +76,7 @@ func (s *testConfigSuite) TestConfigSchedule(c *C) { resp, err := doGet(addr) c.Assert(err, IsNil) sc := &server.ScheduleConfig{} - readJSON(resp.Body, sc) + c.Assert(readJSON(resp.Body, sc), IsNil) sc.MaxStoreDownTime.Duration = time.Second postData, err := json.Marshal(sc) @@ -88,7 +88,7 @@ func (s *testConfigSuite) TestConfigSchedule(c *C) { resp, err = doGet(addr) c.Assert(err, IsNil) sc1 := &server.ScheduleConfig{} - readJSON(resp.Body, sc1) + c.Assert(readJSON(resp.Body, sc1), IsNil) c.Assert(*sc, DeepEquals, *sc1) } diff --git a/server/api/diagnose.go b/server/api/diagnose.go index 3a79daede74..e54f9996a64 100644 --- a/server/api/diagnose.go +++ b/server/api/diagnose.go @@ -38,19 +38,15 @@ type Recommendation struct { //lint:file-ignore U1000 document available levels and modules const ( // analyze levels - levelNormal = "Normal" levelWarning = "Warning" levelMinor = "Minor" levelMajor = "Major" levelCritical = "Critical" - levelFatal = "Fatal" // analyze modules - modMember = "member" - modTiKV = "TiKV" - modReplica = "Replic" - modSchedule = "Schedule" - modDefault = "Default" + modMember = "member" + modTiKV = "TiKV" + modDefault = "Default" memberOneInstance diagnoseType = iota memberEvenInstance diff --git a/server/api/diagnose_test.go b/server/api/diagnose_test.go index 61f019d4fd8..122292e958c 100644 --- a/server/api/diagnose_test.go +++ b/server/api/diagnose_test.go @@ -34,7 +34,7 @@ func (s *testDiagnoseAPISuite) SetUpSuite(c *C) { func checkDiagnoseResponse(c *C, body []byte) { got := []Recommendation{} - json.Unmarshal(body, &got) + c.Assert(json.Unmarshal(body, &got), IsNil) for _, r := range got { c.Assert(len(r.Module) != 0, IsTrue) c.Assert(len(r.Level) != 0, IsTrue) diff --git a/server/api/health_test.go b/server/api/health_test.go index d0d5216c15c..7f21cc47c1e 100644 --- a/server/api/health_test.go +++ b/server/api/health_test.go @@ -35,7 +35,7 @@ func (s *testHealthAPISuite) SetUpSuite(c *C) { func checkSliceResponse(c *C, body []byte, cfgs []*server.Config, unhealth string) { got := []Health{} - json.Unmarshal(body, &got) + c.Assert(json.Unmarshal(body, &got), IsNil) c.Assert(len(got), Equals, len(cfgs)) diff --git a/server/api/member_test.go b/server/api/member_test.go index f262294cec2..e0e396fda7f 100644 --- a/server/api/member_test.go +++ b/server/api/member_test.go @@ -93,7 +93,7 @@ func (s *testMemberAPISuite) TestMemberLeader(c *C) { c.Assert(err, IsNil) var got pdpb.Member - json.Unmarshal(buf, &got) + c.Assert(json.Unmarshal(buf, &got), IsNil) c.Assert(got.GetClientUrls(), DeepEquals, leader.GetClientUrls()) c.Assert(got.GetMemberId(), Equals, leader.GetMemberId()) } diff --git a/server/api/server_test.go b/server/api/server_test.go index 68d9b6131d3..1d565f3368b 100644 --- a/server/api/server_test.go +++ b/server/api/server_test.go @@ -75,7 +75,7 @@ func mustNewServer(c *C) (*server.Server, cleanUpFunc) { func mustNewCluster(c *C, num int) ([]*server.Config, []*server.Server, cleanUpFunc) { svrs := make([]*server.Server, 0, num) - cfgs := server.NewTestMultiConfig(num) + cfgs := server.NewTestMultiConfig(c, num) ch := make(chan *server.Server, num) for _, cfg := range cfgs { diff --git a/server/api/status_test.go b/server/api/status_test.go index 4e3d3011f79..fcfeeb2b179 100644 --- a/server/api/status_test.go +++ b/server/api/status_test.go @@ -34,7 +34,7 @@ func (s *testStatusAPISuite) SetUpSuite(c *C) { func checkStatusResponse(c *C, body []byte, cfgs []*server.Config) { got := status{} - json.Unmarshal(body, &got) + c.Assert(json.Unmarshal(body, &got), IsNil) c.Assert(got.BuildTS, Equals, server.PDBuildTS) c.Assert(got.GitHash, Equals, server.PDGitHash) diff --git a/server/api/store_ns_test.go b/server/api/store_ns_test.go index fe04090c096..afdd22d2ec8 100644 --- a/server/api/store_ns_test.go +++ b/server/api/store_ns_test.go @@ -60,7 +60,7 @@ func (s *testStoreNsSuite) SetUpSuite(c *C) { }, } - cfg := server.NewTestSingleConfig() + cfg := server.NewTestSingleConfig(c) cfg.NamespaceClassifier = "table" svr, err := server.CreateServer(cfg, NewHandler) c.Assert(err, IsNil) diff --git a/server/api/trend_test.go b/server/api/trend_test.go index 7230606c96a..851b607ef13 100644 --- a/server/api/trend_test.go +++ b/server/api/trend_test.go @@ -46,9 +46,9 @@ func (s *testTrendSuite) TestTrend(c *C) { mustRegionHeartbeat(c, svr, region6) // Create 3 operators that transfers leader, moves follower, moves leader. - svr.GetHandler().AddTransferLeaderOperator(4, 2) - svr.GetHandler().AddTransferPeerOperator(5, 2, 3) - svr.GetHandler().AddTransferPeerOperator(6, 1, 3) + c.Assert(svr.GetHandler().AddTransferLeaderOperator(4, 2), IsNil) + c.Assert(svr.GetHandler().AddTransferPeerOperator(5, 2, 3), IsNil) + c.Assert(svr.GetHandler().AddTransferPeerOperator(6, 1, 3), IsNil) // Complete the operators. mustRegionHeartbeat(c, svr, region4.Clone(core.WithLeader(region4.GetStorePeer(2)))) diff --git a/server/cluster_info.go b/server/cluster_info.go index bf19bccc635..22303135a46 100644 --- a/server/cluster_info.go +++ b/server/cluster_info.go @@ -190,7 +190,8 @@ func (c *clusterInfo) putStoreLocked(store *core.StoreInfo) error { return err } } - return c.core.PutStore(store) + c.core.PutStore(store) + return nil } // BlockStore stops balancer from selecting the store. @@ -313,7 +314,8 @@ func (c *clusterInfo) putRegionLocked(region *core.RegionInfo) error { return err } } - return c.core.PutRegion(region) + c.core.PutRegion(region) + return nil } func (c *clusterInfo) getRegions() []*core.RegionInfo { diff --git a/server/cluster_info_test.go b/server/cluster_info_test.go index 2e74ad4f69c..cf64a0ae3f6 100644 --- a/server/cluster_info_test.go +++ b/server/cluster_info_test.go @@ -264,7 +264,8 @@ func (s *testClusterInfoSuite) TestLoadClusterInfo(c *C) { defer cleanup() kv := server.kv - _, opt := newTestScheduleConfig() + _, opt, err := newTestScheduleConfig() + c.Assert(err, IsNil) // Cluster is not bootstrapped. cluster, err := loadClusterInfo(server.idAlloc, kv, opt) @@ -295,7 +296,8 @@ func (s *testClusterInfoSuite) TestLoadClusterInfo(c *C) { } func (s *testClusterInfoSuite) TestStoreHeartbeat(c *C) { - _, opt := newTestScheduleConfig() + _, opt, err := newTestScheduleConfig() + c.Assert(err, IsNil) cluster := newClusterInfo(core.NewMockIDAllocator(), opt, core.NewKV(core.NewMemoryKV())) n, np := uint64(3), uint64(3) @@ -340,7 +342,8 @@ func (s *testClusterInfoSuite) TestStoreHeartbeat(c *C) { } func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) { - _, opt := newTestScheduleConfig() + _, opt, err := newTestScheduleConfig() + c.Assert(err, IsNil) cluster := newClusterInfo(core.NewMockIDAllocator(), opt, core.NewKV(core.NewMemoryKV())) n, np := uint64(3), uint64(3) @@ -349,7 +352,7 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) { regions := newTestRegions(n, np) for _, store := range stores { - cluster.putStore(store) + c.Assert(cluster.putStore(store), IsNil) } for i, region := range regions { @@ -554,7 +557,8 @@ func heartbeatRegions(c *C, cluster *clusterInfo, regions []*metapb.Region) { } func (s *testClusterInfoSuite) TestHeartbeatSplit(c *C) { - _, opt := newTestScheduleConfig() + _, opt, err := newTestScheduleConfig() + c.Assert(err, IsNil) cluster := newClusterInfo(core.NewMockIDAllocator(), opt, nil) // 1: [nil, nil) @@ -592,7 +596,8 @@ func (s *testClusterInfoSuite) TestHeartbeatSplit(c *C) { } func (s *testClusterInfoSuite) TestRegionSplitAndMerge(c *C) { - _, opt := newTestScheduleConfig() + _, opt, err := newTestScheduleConfig() + c.Assert(err, IsNil) cluster := newClusterInfo(core.NewMockIDAllocator(), opt, nil) regions := []*metapb.Region{ @@ -631,11 +636,12 @@ func (s *testClusterInfoSuite) TestRegionSplitAndMerge(c *C) { } func (s *testClusterInfoSuite) TestUpdateStorePendingPeerCount(c *C) { - _, opt := newTestScheduleConfig() + _, opt, err := newTestScheduleConfig() + c.Assert(err, IsNil) tc := newTestClusterInfo(opt) stores := newTestStores(5) for _, s := range stores { - tc.putStore(s) + c.Assert(tc.putStore(s), IsNil) } peers := []*metapb.Peer{ { @@ -656,10 +662,10 @@ func (s *testClusterInfoSuite) TestUpdateStorePendingPeerCount(c *C) { }, } origin := core.NewRegionInfo(&metapb.Region{Id: 1, Peers: peers[:3]}, peers[0], core.WithPendingPeers(peers[1:3])) - tc.handleRegionHeartbeat(origin) + c.Assert(tc.handleRegionHeartbeat(origin), IsNil) checkPendingPeerCount([]int{0, 1, 1, 0}, tc.clusterInfo, c) newRegion := core.NewRegionInfo(&metapb.Region{Id: 1, Peers: peers[1:]}, peers[1], core.WithPendingPeers(peers[3:4])) - tc.handleRegionHeartbeat(newRegion) + c.Assert(tc.handleRegionHeartbeat(newRegion), IsNil) checkPendingPeerCount([]int{0, 0, 0, 1}, tc.clusterInfo, c) } diff --git a/server/cluster_test.go b/server/cluster_test.go index 99b302d09db..9713e8f71d8 100644 --- a/server/cluster_test.go +++ b/server/cluster_test.go @@ -20,7 +20,6 @@ import ( "sync" "time" - "github.com/coreos/etcd/clientv3" . "github.com/pingcap/check" gofail "github.com/pingcap/gofail/runtime" "github.com/pingcap/kvproto/pkg/metapb" @@ -37,9 +36,7 @@ const ( var _ = Suite(&testClusterSuite{}) type baseCluster struct { - client *clientv3.Client svr *Server - cleanup CleanupFunc grpcPDClient pdpb.PDClient } @@ -119,12 +116,12 @@ func (s *baseCluster) newRegion(c *C, regionID uint64, startKey []byte, func (s *testClusterSuite) TestBootstrap(c *C) { var err error - _, s.svr, s.cleanup, err = NewTestServer() + var cleanup func() + _, s.svr, cleanup, err = NewTestServer(c) c.Assert(err, IsNil) - s.client = s.svr.client mustWaitLeader(c, []*Server{s.svr}) s.grpcPDClient = mustNewGrpcClient(c, s.svr.GetAddr()) - defer s.cleanup() + defer cleanup() clusterID := s.svr.clusterID // IsBootstrapped returns false. @@ -239,16 +236,17 @@ func (s *baseCluster) getClusterConfig(c *C, clusterID uint64) *metapb.Cluster { func (s *testClusterSuite) TestGetPutConfig(c *C) { var err error - _, s.svr, s.cleanup, err = NewTestServer() + var cleanup func() + _, s.svr, cleanup, err = NewTestServer(c) c.Assert(err, IsNil) - s.client = s.svr.client mustWaitLeader(c, []*Server{s.svr}) s.grpcPDClient = mustNewGrpcClient(c, s.svr.GetAddr()) - defer s.cleanup() + defer cleanup() clusterID := s.svr.clusterID storeAddr := "127.0.0.1:0" - s.svr.bootstrapCluster(s.newBootstrapRequest(c, s.svr.clusterID, storeAddr)) + _, err = s.svr.bootstrapCluster(s.newBootstrapRequest(c, s.svr.clusterID, storeAddr)) + c.Assert(err, IsNil) // Get region. region := s.getRegion(c, clusterID, []byte("abc")) @@ -338,7 +336,7 @@ func (s *baseCluster) resetStoreState(c *C, storeID uint64, state metapb.StoreSt store := cluster.GetStore(storeID) c.Assert(store, NotNil) store.State = state - cluster.putStore(store) + c.Assert(cluster.putStore(store), IsNil) } func (s *baseCluster) testRemoveStore(c *C, clusterID uint64, store *metapb.Store) { @@ -414,19 +412,23 @@ func (s *baseCluster) testRemoveStore(c *C, clusterID uint64, store *metapb.Stor // Make sure PD will not panic if it start and stop again and again. func (s *testClusterSuite) TestRaftClusterRestart(c *C) { - _, svr, cleanup, err := NewTestServer() + var err error + var cleanup func() + _, s.svr, cleanup, err = NewTestServer(c) c.Assert(err, IsNil) defer cleanup() - svr.bootstrapCluster(s.newBootstrapRequest(c, svr.clusterID, "127.0.0.1:0")) + mustWaitLeader(c, []*Server{s.svr}) + _, err = s.svr.bootstrapCluster(s.newBootstrapRequest(c, s.svr.clusterID, "127.0.0.1:0")) + c.Assert(err, IsNil) - cluster := svr.GetRaftCluster() + cluster := s.svr.GetRaftCluster() c.Assert(cluster, NotNil) cluster.stop() - err = svr.createRaftCluster() + err = s.svr.createRaftCluster() c.Assert(err, IsNil) - cluster = svr.GetRaftCluster() + cluster = s.svr.GetRaftCluster() c.Assert(cluster, NotNil) cluster.stop() } @@ -434,8 +436,9 @@ func (s *testClusterSuite) TestRaftClusterRestart(c *C) { // Make sure PD will not deadlock if it start and stop again and again. func (s *testClusterSuite) TestRaftClusterMultipleRestart(c *C) { var err error - _, s.svr, s.cleanup, err = NewTestServer() - defer s.cleanup() + var cleanup func() + _, s.svr, cleanup, err = NewTestServer(c) + defer cleanup() c.Assert(err, IsNil) mustWaitLeader(c, []*Server{s.svr}) _, err = s.svr.bootstrapCluster(s.newBootstrapRequest(c, s.svr.clusterID, "127.0.0.1:0")) @@ -462,12 +465,12 @@ func (s *testClusterSuite) TestRaftClusterMultipleRestart(c *C) { func (s *testClusterSuite) TestGetPDMembers(c *C) { var err error - _, s.svr, s.cleanup, err = NewTestServer() + var cleanup func() + _, s.svr, cleanup, err = NewTestServer(c) c.Assert(err, IsNil) - s.client = s.svr.client mustWaitLeader(c, []*Server{s.svr}) s.grpcPDClient = mustNewGrpcClient(c, s.svr.GetAddr()) - defer s.cleanup() + defer cleanup() req := &pdpb.GetMembersRequest{ Header: newRequestHeader(s.svr.ClusterID()), } @@ -480,13 +483,13 @@ func (s *testClusterSuite) TestGetPDMembers(c *C) { func (s *testClusterSuite) TestConcurrentHandleRegion(c *C) { var err error - _, s.svr, s.cleanup, err = NewTestServer() + _, s.svr, _, err = NewTestServer(c) c.Assert(err, IsNil) - s.client = s.svr.client mustWaitLeader(c, []*Server{s.svr}) s.grpcPDClient = mustNewGrpcClient(c, s.svr.GetAddr()) storeAddrs := []string{"127.0.1.1:0", "127.0.1.1:1", "127.0.1.1:2"} - s.svr.bootstrapCluster(s.newBootstrapRequest(c, s.svr.clusterID, "127.0.0.1:0")) + _, err = s.svr.bootstrapCluster(s.newBootstrapRequest(c, s.svr.clusterID, "127.0.0.1:0")) + c.Assert(err, IsNil) s.svr.cluster.RLock() s.svr.cluster.cachedCluster.Lock() s.svr.cluster.cachedCluster.kv = core.NewKV(core.NewMemoryKV()) diff --git a/server/cluster_worker_test.go b/server/cluster_worker_test.go index 1d699609d62..963eaa36753 100644 --- a/server/cluster_worker_test.go +++ b/server/cluster_worker_test.go @@ -50,13 +50,14 @@ func (s *testClusterWorkerSuite) TestReportBatchSplit(c *C) { func (s *testClusterWorkerSuite) TestValidRequestRegion(c *C) { var err error - _, s.svr, s.cleanup, err = NewTestServer() + var cleanup func() + _, s.svr, cleanup, err = NewTestServer(c) c.Assert(err, IsNil) - s.client = s.svr.client mustWaitLeader(c, []*Server{s.svr}) s.grpcPDClient = mustNewGrpcClient(c, s.svr.GetAddr()) - defer s.cleanup() - s.svr.bootstrapCluster(s.newBootstrapRequest(c, s.svr.clusterID, "127.0.0.1:0")) + defer cleanup() + _, err = s.svr.bootstrapCluster(s.newBootstrapRequest(c, s.svr.clusterID, "127.0.0.1:0")) + c.Assert(err, IsNil) cluster := s.svr.GetRaftCluster() c.Assert(cluster, NotNil) diff --git a/server/config_test.go b/server/config_test.go index 6af01dabafc..b90a075c40b 100644 --- a/server/config_test.go +++ b/server/config_test.go @@ -36,25 +36,27 @@ func (s *testConfigSuite) TestTLS(c *C) { } func (s *testConfigSuite) TestBadFormatJoinAddr(c *C) { - cfg := NewTestSingleConfig() + cfg := NewTestSingleConfig(c) cfg.Join = "127.0.0.1:2379" // Wrong join addr without scheme. c.Assert(cfg.Adjust(nil), NotNil) } func (s *testConfigSuite) TestReloadConfig(c *C) { - _, opt := newTestScheduleConfig() + _, opt, err := newTestScheduleConfig() + c.Assert(err, IsNil) kv := core.NewKV(core.NewMemoryKV()) scheduleCfg := opt.load() scheduleCfg.MaxSnapshotCount = 10 opt.SetMaxReplicas(5) opt.loadPDServerConfig().UseRegionStorage = true - opt.persist(kv) + c.Assert(opt.persist(kv), IsNil) // suppose we add a new default enable scheduler "adjacent-region" defaultSchedulers := []string{"balance-region", "balance-leader", "hot-region", "label", "adjacent-region"} - _, newOpt := newTestScheduleConfig() + _, newOpt, err := newTestScheduleConfig() + c.Assert(err, IsNil) newOpt.AddSchedulerCfg("adjacent-region", []string{}) - newOpt.reload(kv) + c.Assert(newOpt.reload(kv), IsNil) schedulers := newOpt.GetSchedulers() c.Assert(schedulers, HasLen, 5) c.Assert(newOpt.loadPDServerConfig().UseRegionStorage, IsTrue) diff --git a/server/coordinator_test.go b/server/coordinator_test.go index 8fff7498ee4..1d5d3597550 100644 --- a/server/coordinator_test.go +++ b/server/coordinator_test.go @@ -34,12 +34,14 @@ func newTestOperator(regionID uint64, regionEpoch *metapb.RegionEpoch, kind sche return schedule.NewOperator("test", regionID, regionEpoch, kind) } -func newTestScheduleConfig() (*ScheduleConfig, *scheduleOption) { +func newTestScheduleConfig() (*ScheduleConfig, *scheduleOption, error) { cfg := NewConfig() - cfg.Adjust(nil) + if err := cfg.Adjust(nil); err != nil { + return nil, nil, err + } opt := newScheduleOption(cfg) opt.SetClusterVersion(MinSupportedVersion(Version2_0)) - return &cfg.Schedule, opt + return &cfg.Schedule, opt, nil } type testClusterInfo struct { @@ -63,7 +65,7 @@ func newTestRegionMeta(regionID uint64) *metapb.Region { } } -func (c *testClusterInfo) addRegionStore(storeID uint64, regionCount int) { +func (c *testClusterInfo) addRegionStore(storeID uint64, regionCount int) error { store := core.NewStoreInfo(&metapb.Store{Id: storeID}) store.Stats = &pdpb.StoreStats{} store.LastHeartbeatTS = time.Now() @@ -71,10 +73,10 @@ func (c *testClusterInfo) addRegionStore(storeID uint64, regionCount int) { store.RegionSize = int64(regionCount) * 10 store.Stats.Capacity = 1000 * (1 << 20) store.Stats.Available = store.Stats.Capacity - uint64(store.RegionSize) - c.putStore(store) + return c.putStore(store) } -func (c *testClusterInfo) addLeaderRegion(regionID uint64, leaderID uint64, followerIds ...uint64) { +func (c *testClusterInfo) addLeaderRegion(regionID uint64, leaderID uint64, followerIds ...uint64) error { region := newTestRegionMeta(regionID) leader, _ := c.AllocPeer(leaderID) region.Peers = []*metapb.Peer{leader} @@ -83,39 +85,39 @@ func (c *testClusterInfo) addLeaderRegion(regionID uint64, leaderID uint64, foll region.Peers = append(region.Peers, peer) } regionInfo := core.NewRegionInfo(region, leader, core.SetApproximateSize(10), core.SetApproximateKeys(10)) - c.putRegion(regionInfo) + return c.putRegion(regionInfo) } -func (c *testClusterInfo) updateLeaderCount(storeID uint64, leaderCount int) { +func (c *testClusterInfo) updateLeaderCount(storeID uint64, leaderCount int) error { store := c.GetStore(storeID) store.LeaderCount = leaderCount store.LeaderSize = int64(leaderCount) * 10 - c.putStore(store) + return c.putStore(store) } -func (c *testClusterInfo) addLeaderStore(storeID uint64, leaderCount int) { +func (c *testClusterInfo) addLeaderStore(storeID uint64, leaderCount int) error { store := core.NewStoreInfo(&metapb.Store{Id: storeID}) store.Stats = &pdpb.StoreStats{} store.LastHeartbeatTS = time.Now() store.LeaderCount = leaderCount store.LeaderSize = int64(leaderCount) * 10 - c.putStore(store) + return c.putStore(store) } -func (c *testClusterInfo) setStoreDown(storeID uint64) { +func (c *testClusterInfo) setStoreDown(storeID uint64) error { store := c.GetStore(storeID) store.State = metapb.StoreState_Up store.LastHeartbeatTS = time.Time{} - c.putStore(store) + return c.putStore(store) } -func (c *testClusterInfo) setStoreOffline(storeID uint64) { +func (c *testClusterInfo) setStoreOffline(storeID uint64) error { store := c.GetStore(storeID) store.State = metapb.StoreState_Offline - c.putStore(store) + return c.putStore(store) } -func (c *testClusterInfo) LoadRegion(regionID uint64, followerIds ...uint64) { +func (c *testClusterInfo) LoadRegion(regionID uint64, followerIds ...uint64) error { // regions load from etcd will have no leader region := newTestRegionMeta(regionID) region.Peers = []*metapb.Peer{} @@ -123,7 +125,7 @@ func (c *testClusterInfo) LoadRegion(regionID uint64, followerIds ...uint64) { peer, _ := c.AllocPeer(id) region.Peers = append(region.Peers, peer) } - c.putRegion(core.NewRegionInfo(region, nil)) + return c.putRegion(core.NewRegionInfo(region, nil)) } var _ = Suite(&testCoordinatorSuite{}) @@ -131,7 +133,8 @@ var _ = Suite(&testCoordinatorSuite{}) type testCoordinatorSuite struct{} func (s *testCoordinatorSuite) TestBasic(c *C) { - _, opt := newTestScheduleConfig() + _, opt, err := newTestScheduleConfig() + c.Assert(err, IsNil) tc := newTestClusterInfo(opt) hbStreams := newHeartbeatStreams(tc.clusterInfo.getClusterID()) defer hbStreams.Close() @@ -139,7 +142,7 @@ func (s *testCoordinatorSuite) TestBasic(c *C) { co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) oc := co.opController - tc.addLeaderRegion(1, 1) + c.Assert(tc.addLeaderRegion(1, 1), IsNil) op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), schedule.OpLeader) oc.AddOperator(op1) @@ -187,7 +190,8 @@ func newMockHeartbeatStream() *mockHeartbeatStream { } func (s *testCoordinatorSuite) TestDispatch(c *C) { - _, opt := newTestScheduleConfig() + _, opt, err := newTestScheduleConfig() + c.Assert(err, IsNil) tc := newTestClusterInfo(opt) hbStreams := newHeartbeatStreams(tc.getClusterID()) defer hbStreams.Close() @@ -198,18 +202,18 @@ func (s *testCoordinatorSuite) TestDispatch(c *C) { defer co.stop() // Transfer peer from store 4 to store 1. - tc.addRegionStore(4, 40) - tc.addRegionStore(3, 30) - tc.addRegionStore(2, 20) - tc.addRegionStore(1, 10) - tc.addLeaderRegion(1, 2, 3, 4) + c.Assert(tc.addRegionStore(4, 40), IsNil) + c.Assert(tc.addRegionStore(3, 30), IsNil) + c.Assert(tc.addRegionStore(2, 20), IsNil) + c.Assert(tc.addRegionStore(1, 10), IsNil) + c.Assert(tc.addLeaderRegion(1, 2, 3, 4), IsNil) // Transfer leader from store 4 to store 2. - tc.updateLeaderCount(4, 50) - tc.updateLeaderCount(3, 30) - tc.updateLeaderCount(2, 20) - tc.updateLeaderCount(1, 10) - tc.addLeaderRegion(2, 4, 3, 2) + c.Assert(tc.updateLeaderCount(4, 50), IsNil) + c.Assert(tc.updateLeaderCount(3, 30), IsNil) + c.Assert(tc.updateLeaderCount(2, 20), IsNil) + c.Assert(tc.updateLeaderCount(1, 10), IsNil) + c.Assert(tc.addLeaderRegion(2, 4, 3, 2), IsNil) // Wait for schedule and turn off balance. waitOperator(c, co, 1) @@ -223,32 +227,36 @@ func (s *testCoordinatorSuite) TestDispatch(c *C) { // Transfer peer. region := tc.GetRegion(1).Clone() - dispatchHeartbeat(c, co, region, stream) + c.Assert(dispatchHeartbeat(c, co, region, stream), IsNil) region = waitAddLearner(c, stream, region, 1) - dispatchHeartbeat(c, co, region, stream) + c.Assert(dispatchHeartbeat(c, co, region, stream), IsNil) region = waitPromoteLearner(c, stream, region, 1) - dispatchHeartbeat(c, co, region, stream) + c.Assert(dispatchHeartbeat(c, co, region, stream), IsNil) region = waitRemovePeer(c, stream, region, 4) - dispatchHeartbeat(c, co, region, stream) - dispatchHeartbeat(c, co, region, stream) + c.Assert(dispatchHeartbeat(c, co, region, stream), IsNil) + c.Assert(dispatchHeartbeat(c, co, region, stream), IsNil) waitNoResponse(c, stream) // Transfer leader. region = tc.GetRegion(2).Clone() - dispatchHeartbeat(c, co, region, stream) + c.Assert(dispatchHeartbeat(c, co, region, stream), IsNil) waitTransferLeader(c, stream, region, 2) - dispatchHeartbeat(c, co, region, stream) + c.Assert(dispatchHeartbeat(c, co, region, stream), IsNil) waitNoResponse(c, stream) } -func dispatchHeartbeat(c *C, co *coordinator, region *core.RegionInfo, stream *mockHeartbeatStream) { +func dispatchHeartbeat(c *C, co *coordinator, region *core.RegionInfo, stream *mockHeartbeatStream) error { co.hbStreams.bindStream(region.GetLeader().GetStoreId(), stream) - co.cluster.putRegion(region.Clone()) + if err := co.cluster.putRegion(region.Clone()); err != nil { + return err + } co.opController.Dispatch(region) + return nil } func (s *testCoordinatorSuite) TestCollectMetrics(c *C) { - _, opt := newTestScheduleConfig() + _, opt, err := newTestScheduleConfig() + c.Assert(err, IsNil) tc := newTestClusterInfo(opt) hbStreams := newHeartbeatStreams(tc.getClusterID()) defer hbStreams.Close() @@ -259,7 +267,7 @@ func (s *testCoordinatorSuite) TestCollectMetrics(c *C) { for i := 0; i <= 10; i++ { go func(i int) { for j := 0; j < 10000; j++ { - tc.addRegionStore(uint64(i%5), rand.Intn(200)) + c.Assert(tc.addRegionStore(uint64(i%5), rand.Intn(200)), IsNil) } }(i) } @@ -271,7 +279,8 @@ func (s *testCoordinatorSuite) TestCollectMetrics(c *C) { } func (s *testCoordinatorSuite) TestCheckRegion(c *C) { - cfg, opt := newTestScheduleConfig() + cfg, opt, err := newTestScheduleConfig() + c.Assert(err, IsNil) cfg.DisableLearner = false tc := newTestClusterInfo(opt) hbStreams := newHeartbeatStreams(tc.getClusterID()) @@ -280,11 +289,11 @@ func (s *testCoordinatorSuite) TestCheckRegion(c *C) { co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) co.run() - tc.addRegionStore(4, 4) - tc.addRegionStore(3, 3) - tc.addRegionStore(2, 2) - tc.addRegionStore(1, 1) - tc.addLeaderRegion(1, 2, 3) + c.Assert(tc.addRegionStore(4, 4), IsNil) + c.Assert(tc.addRegionStore(3, 3), IsNil) + c.Assert(tc.addRegionStore(2, 2), IsNil) + c.Assert(tc.addRegionStore(1, 1), IsNil) + c.Assert(tc.addLeaderRegion(1, 2, 3), IsNil) c.Assert(co.checkRegion(tc.GetRegion(1)), IsTrue) waitOperator(c, co, 1) testutil.CheckAddPeer(c, co.opController.GetOperator(1), schedule.OpReplica, 1) @@ -296,7 +305,7 @@ func (s *testCoordinatorSuite) TestCheckRegion(c *C) { core.WithAddPeer(p), core.WithPendingPeers(append(r.GetPendingPeers(), p)), ) - tc.putRegion(r) + c.Assert(tc.putRegion(r), IsNil) c.Assert(co.checkRegion(tc.GetRegion(1)), IsFalse) co.stop() co.wg.Wait() @@ -309,14 +318,14 @@ func (s *testCoordinatorSuite) TestCheckRegion(c *C) { defer co.wg.Wait() defer co.stop() - tc.addRegionStore(4, 4) - tc.addRegionStore(3, 3) - tc.addRegionStore(2, 2) - tc.addRegionStore(1, 1) - tc.putRegion(r) + c.Assert(tc.addRegionStore(4, 4), IsNil) + c.Assert(tc.addRegionStore(3, 3), IsNil) + c.Assert(tc.addRegionStore(2, 2), IsNil) + c.Assert(tc.addRegionStore(1, 1), IsNil) + c.Assert(tc.putRegion(r), IsNil) c.Assert(co.checkRegion(tc.GetRegion(1)), IsFalse) r = r.Clone(core.WithPendingPeers(nil)) - tc.putRegion(r) + c.Assert(tc.putRegion(r), IsNil) c.Assert(co.checkRegion(tc.GetRegion(1)), IsTrue) waitOperator(c, co, 1) op := co.opController.GetOperator(1) @@ -327,7 +336,8 @@ func (s *testCoordinatorSuite) TestCheckRegion(c *C) { func (s *testCoordinatorSuite) TestReplica(c *C) { // Turn off balance. - cfg, opt := newTestScheduleConfig() + cfg, opt, err := newTestScheduleConfig() + c.Assert(err, IsNil) cfg.LeaderScheduleLimit = 0 cfg.RegionScheduleLimit = 0 @@ -340,25 +350,25 @@ func (s *testCoordinatorSuite) TestReplica(c *C) { defer co.wg.Wait() defer co.stop() - tc.addRegionStore(1, 1) - tc.addRegionStore(2, 2) - tc.addRegionStore(3, 3) - tc.addRegionStore(4, 4) + c.Assert(tc.addRegionStore(1, 1), IsNil) + c.Assert(tc.addRegionStore(2, 2), IsNil) + c.Assert(tc.addRegionStore(3, 3), IsNil) + c.Assert(tc.addRegionStore(4, 4), IsNil) stream := newMockHeartbeatStream() // Add peer to store 1. - tc.addLeaderRegion(1, 2, 3) + c.Assert(tc.addLeaderRegion(1, 2, 3), IsNil) region := tc.GetRegion(1) - dispatchHeartbeat(c, co, region, stream) + c.Assert(dispatchHeartbeat(c, co, region, stream), IsNil) region = waitAddLearner(c, stream, region, 1) - dispatchHeartbeat(c, co, region, stream) + c.Assert(dispatchHeartbeat(c, co, region, stream), IsNil) region = waitPromoteLearner(c, stream, region, 1) - dispatchHeartbeat(c, co, region, stream) + c.Assert(dispatchHeartbeat(c, co, region, stream), IsNil) waitNoResponse(c, stream) // Peer in store 3 is down, remove peer in store 3 and add peer to store 4. - tc.setStoreDown(3) + c.Assert(tc.setStoreDown(3), IsNil) downPeer := &pdpb.PeerStats{ Peer: region.GetStorePeer(3), DownSeconds: 24 * 60 * 60, @@ -366,33 +376,34 @@ func (s *testCoordinatorSuite) TestReplica(c *C) { region = region.Clone( core.WithDownPeers(append(region.GetDownPeers(), downPeer)), ) - dispatchHeartbeat(c, co, region, stream) + c.Assert(dispatchHeartbeat(c, co, region, stream), IsNil) region = waitAddLearner(c, stream, region, 4) - dispatchHeartbeat(c, co, region, stream) + c.Assert(dispatchHeartbeat(c, co, region, stream), IsNil) region = waitPromoteLearner(c, stream, region, 4) region = region.Clone(core.WithDownPeers(nil)) - dispatchHeartbeat(c, co, region, stream) + c.Assert(dispatchHeartbeat(c, co, region, stream), IsNil) waitNoResponse(c, stream) // Remove peer from store 4. - tc.addLeaderRegion(2, 1, 2, 3, 4) + c.Assert(tc.addLeaderRegion(2, 1, 2, 3, 4), IsNil) region = tc.GetRegion(2) - dispatchHeartbeat(c, co, region, stream) + c.Assert(dispatchHeartbeat(c, co, region, stream), IsNil) region = waitRemovePeer(c, stream, region, 4) - dispatchHeartbeat(c, co, region, stream) + c.Assert(dispatchHeartbeat(c, co, region, stream), IsNil) waitNoResponse(c, stream) // Remove offline peer directly when it's pending. - tc.addLeaderRegion(3, 1, 2, 3) - tc.setStoreOffline(3) + c.Assert(tc.addLeaderRegion(3, 1, 2, 3), IsNil) + c.Assert(tc.setStoreOffline(3), IsNil) region = tc.GetRegion(3) region = region.Clone(core.WithPendingPeers([]*metapb.Peer{region.GetStorePeer(3)})) - dispatchHeartbeat(c, co, region, stream) + c.Assert(dispatchHeartbeat(c, co, region, stream), IsNil) waitNoResponse(c, stream) } func (s *testCoordinatorSuite) TestPeerState(c *C) { - _, opt := newTestScheduleConfig() + _, opt, err := newTestScheduleConfig() + c.Assert(err, IsNil) tc := newTestClusterInfo(opt) hbStreams := newHeartbeatStreams(tc.getClusterID()) defer hbStreams.Close() @@ -403,11 +414,11 @@ func (s *testCoordinatorSuite) TestPeerState(c *C) { defer co.stop() // Transfer peer from store 4 to store 1. - tc.addRegionStore(1, 10) - tc.addRegionStore(2, 20) - tc.addRegionStore(3, 30) - tc.addRegionStore(4, 40) - tc.addLeaderRegion(1, 2, 3, 4) + c.Assert(tc.addRegionStore(1, 10), IsNil) + c.Assert(tc.addRegionStore(2, 20), IsNil) + c.Assert(tc.addRegionStore(3, 30), IsNil) + c.Assert(tc.addRegionStore(4, 40), IsNil) + c.Assert(tc.addLeaderRegion(1, 2, 3, 4), IsNil) stream := newMockHeartbeatStream() @@ -418,47 +429,48 @@ func (s *testCoordinatorSuite) TestPeerState(c *C) { region := tc.GetRegion(1).Clone() // Add new peer. - dispatchHeartbeat(c, co, region, stream) + c.Assert(dispatchHeartbeat(c, co, region, stream), IsNil) region = waitAddLearner(c, stream, region, 1) - dispatchHeartbeat(c, co, region, stream) + c.Assert(dispatchHeartbeat(c, co, region, stream), IsNil) region = waitPromoteLearner(c, stream, region, 1) // If the new peer is pending, the operator will not finish. region = region.Clone(core.WithPendingPeers(append(region.GetPendingPeers(), region.GetStorePeer(1)))) - dispatchHeartbeat(c, co, region, stream) + c.Assert(dispatchHeartbeat(c, co, region, stream), IsNil) waitNoResponse(c, stream) c.Assert(co.opController.GetOperator(region.GetID()), NotNil) // The new peer is not pending now, the operator will finish. // And we will proceed to remove peer in store 4. region = region.Clone(core.WithPendingPeers(nil)) - dispatchHeartbeat(c, co, region, stream) + c.Assert(dispatchHeartbeat(c, co, region, stream), IsNil) waitRemovePeer(c, stream, region, 4) - tc.addLeaderRegion(1, 1, 2, 3) + c.Assert(tc.addLeaderRegion(1, 1, 2, 3), IsNil) region = tc.GetRegion(1).Clone() - dispatchHeartbeat(c, co, region, stream) + c.Assert(dispatchHeartbeat(c, co, region, stream), IsNil) waitNoResponse(c, stream) } func (s *testCoordinatorSuite) TestShouldRun(c *C) { - _, opt := newTestScheduleConfig() + _, opt, err := newTestScheduleConfig() + c.Assert(err, IsNil) tc := newTestClusterInfo(opt) hbStreams := newHeartbeatStreams(tc.getClusterID()) defer hbStreams.Close() co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) - tc.addLeaderStore(1, 5) - tc.addLeaderStore(2, 2) - tc.addLeaderStore(3, 0) - tc.addLeaderStore(4, 0) - tc.LoadRegion(1, 1, 2, 3) - tc.LoadRegion(2, 1, 2, 3) - tc.LoadRegion(3, 1, 2, 3) - tc.LoadRegion(4, 1, 2, 3) - tc.LoadRegion(5, 1, 2, 3) - tc.LoadRegion(6, 2, 1, 4) - tc.LoadRegion(7, 2, 1, 4) + c.Assert(tc.addLeaderStore(1, 5), IsNil) + c.Assert(tc.addLeaderStore(2, 2), IsNil) + c.Assert(tc.addLeaderStore(3, 0), IsNil) + c.Assert(tc.addLeaderStore(4, 0), IsNil) + c.Assert(tc.LoadRegion(1, 1, 2, 3), IsNil) + c.Assert(tc.LoadRegion(2, 1, 2, 3), IsNil) + c.Assert(tc.LoadRegion(3, 1, 2, 3), IsNil) + c.Assert(tc.LoadRegion(4, 1, 2, 3), IsNil) + c.Assert(tc.LoadRegion(5, 1, 2, 3), IsNil) + c.Assert(tc.LoadRegion(6, 2, 1, 4), IsNil) + c.Assert(tc.LoadRegion(7, 2, 1, 4), IsNil) c.Assert(co.shouldRun(), IsFalse) c.Assert(tc.core.Regions.GetStoreRegionCount(4), Equals, 2) @@ -479,18 +491,19 @@ func (s *testCoordinatorSuite) TestShouldRun(c *C) { for _, t := range tbl { r := tc.GetRegion(t.regionID) nr := r.Clone(core.WithLeader(r.GetPeers()[0])) - tc.handleRegionHeartbeat(nr) + c.Assert(tc.handleRegionHeartbeat(nr), IsNil) c.Assert(co.shouldRun(), Equals, t.shouldRun) } nr := &metapb.Region{Id: 6, Peers: []*metapb.Peer{}} newRegion := core.NewRegionInfo(nr, nil) - tc.handleRegionHeartbeat(newRegion) + c.Assert(tc.handleRegionHeartbeat(newRegion), NotNil) c.Assert(co.cluster.prepareChecker.sum, Equals, 7) } func (s *testCoordinatorSuite) TestAddScheduler(c *C) { - cfg, opt := newTestScheduleConfig() + cfg, opt, err := newTestScheduleConfig() + c.Assert(err, IsNil) cfg.ReplicaScheduleLimit = 0 tc := newTestClusterInfo(opt) @@ -511,15 +524,15 @@ func (s *testCoordinatorSuite) TestAddScheduler(c *C) { stream := newMockHeartbeatStream() // Add stores 1,2,3 - tc.addLeaderStore(1, 1) - tc.addLeaderStore(2, 1) - tc.addLeaderStore(3, 1) + c.Assert(tc.addLeaderStore(1, 1), IsNil) + c.Assert(tc.addLeaderStore(2, 1), IsNil) + c.Assert(tc.addLeaderStore(3, 1), IsNil) // Add regions 1 with leader in store 1 and followers in stores 2,3 - tc.addLeaderRegion(1, 1, 2, 3) + c.Assert(tc.addLeaderRegion(1, 1, 2, 3), IsNil) // Add regions 2 with leader in store 2 and followers in stores 1,3 - tc.addLeaderRegion(2, 2, 1, 3) + c.Assert(tc.addLeaderRegion(2, 2, 1, 3), IsNil) // Add regions 3 with leader in store 3 and followers in stores 1,2 - tc.addLeaderRegion(3, 3, 1, 2) + c.Assert(tc.addLeaderRegion(3, 3, 1, 2), IsNil) oc := co.opController gls, err := schedule.CreateScheduler("grant-leader", oc, "0") @@ -534,21 +547,22 @@ func (s *testCoordinatorSuite) TestAddScheduler(c *C) { // Transfer all leaders to store 1. waitOperator(c, co, 2) region2 := tc.GetRegion(2) - dispatchHeartbeat(c, co, region2, stream) + c.Assert(dispatchHeartbeat(c, co, region2, stream), IsNil) region2 = waitTransferLeader(c, stream, region2, 1) - dispatchHeartbeat(c, co, region2, stream) + c.Assert(dispatchHeartbeat(c, co, region2, stream), IsNil) waitNoResponse(c, stream) waitOperator(c, co, 3) region3 := tc.GetRegion(3) - dispatchHeartbeat(c, co, region3, stream) + c.Assert(dispatchHeartbeat(c, co, region3, stream), IsNil) region3 = waitTransferLeader(c, stream, region3, 1) - dispatchHeartbeat(c, co, region3, stream) + c.Assert(dispatchHeartbeat(c, co, region3, stream), IsNil) waitNoResponse(c, stream) } func (s *testCoordinatorSuite) TestPersistScheduler(c *C) { - cfg, opt := newTestScheduleConfig() + cfg, opt, err := newTestScheduleConfig() + c.Assert(err, IsNil) cfg.ReplicaScheduleLimit = 0 tc := newTestClusterInfo(opt) @@ -559,8 +573,8 @@ func (s *testCoordinatorSuite) TestPersistScheduler(c *C) { co.run() // Add stores 1,2 - tc.addLeaderStore(1, 1) - tc.addLeaderStore(2, 1) + c.Assert(tc.addLeaderStore(1, 1), IsNil) + c.Assert(tc.addLeaderStore(2, 1), IsNil) c.Assert(co.schedulers, HasLen, 4) oc := co.opController @@ -582,13 +596,14 @@ func (s *testCoordinatorSuite) TestPersistScheduler(c *C) { co.wg.Wait() // make a new coordinator for testing // whether the schedulers added or removed in dynamic way are recorded in opt - _, newOpt := newTestScheduleConfig() + _, newOpt, err := newTestScheduleConfig() + c.Assert(err, IsNil) _, err = schedule.CreateScheduler("adjacent-region", oc) c.Assert(err, IsNil) // suppose we add a new default enable scheduler newOpt.AddSchedulerCfg("adjacent-region", []string{}) c.Assert(newOpt.GetSchedulers(), HasLen, 5) - newOpt.reload(co.cluster.kv) + c.Assert(newOpt.reload(co.cluster.kv), IsNil) c.Assert(newOpt.GetSchedulers(), HasLen, 7) tc.clusterInfo.opt = newOpt @@ -598,8 +613,9 @@ func (s *testCoordinatorSuite) TestPersistScheduler(c *C) { co.stop() co.wg.Wait() // suppose restart PD again - _, newOpt = newTestScheduleConfig() - newOpt.reload(tc.kv) + _, newOpt, err = newTestScheduleConfig() + c.Assert(err, IsNil) + c.Assert(newOpt.reload(tc.kv), IsNil) tc.clusterInfo.opt = newOpt co = newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) co.run() @@ -622,8 +638,9 @@ func (s *testCoordinatorSuite) TestPersistScheduler(c *C) { co.stop() co.wg.Wait() - _, newOpt = newTestScheduleConfig() - newOpt.reload(co.cluster.kv) + _, newOpt, err = newTestScheduleConfig() + c.Assert(err, IsNil) + c.Assert(newOpt.reload(co.cluster.kv), IsNil) tc.clusterInfo.opt = newOpt co = newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) @@ -637,7 +654,8 @@ func (s *testCoordinatorSuite) TestPersistScheduler(c *C) { func (s *testCoordinatorSuite) TestRestart(c *C) { // Turn off balance, we test add replica only. - cfg, opt := newTestScheduleConfig() + cfg, opt, err := newTestScheduleConfig() + c.Assert(err, IsNil) cfg.LeaderScheduleLimit = 0 cfg.RegionScheduleLimit = 0 @@ -646,10 +664,10 @@ func (s *testCoordinatorSuite) TestRestart(c *C) { defer hbStreams.Close() // Add 3 stores (1, 2, 3) and a region with 1 replica on store 1. - tc.addRegionStore(1, 1) - tc.addRegionStore(2, 2) - tc.addRegionStore(3, 3) - tc.addLeaderRegion(1, 1) + c.Assert(tc.addRegionStore(1, 1), IsNil) + c.Assert(tc.addRegionStore(2, 2), IsNil) + c.Assert(tc.addRegionStore(3, 3), IsNil) + c.Assert(tc.addLeaderRegion(1, 1), IsNil) region := tc.GetRegion(1) tc.prepareChecker.collect(region) @@ -657,9 +675,9 @@ func (s *testCoordinatorSuite) TestRestart(c *C) { co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) co.run() stream := newMockHeartbeatStream() - dispatchHeartbeat(c, co, region, stream) + c.Assert(dispatchHeartbeat(c, co, region, stream), IsNil) region = waitAddLearner(c, stream, region, 2) - dispatchHeartbeat(c, co, region, stream) + c.Assert(dispatchHeartbeat(c, co, region, stream), IsNil) region = waitPromoteLearner(c, stream, region, 2) co.stop() co.wg.Wait() @@ -667,9 +685,9 @@ func (s *testCoordinatorSuite) TestRestart(c *C) { // Recreate coodinator then add another replica on store 3. co = newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) co.run() - dispatchHeartbeat(c, co, region, stream) + c.Assert(dispatchHeartbeat(c, co, region, stream), IsNil) region = waitAddLearner(c, stream, region, 3) - dispatchHeartbeat(c, co, region, stream) + c.Assert(dispatchHeartbeat(c, co, region, stream), IsNil) waitPromoteLearner(c, stream, region, 3) co.stop() co.wg.Wait() @@ -686,7 +704,8 @@ var _ = Suite(&testOperatorControllerSuite{}) type testOperatorControllerSuite struct{} func (s *testOperatorControllerSuite) TestOperatorCount(c *C) { - _, opt := newTestScheduleConfig() + _, opt, err := newTestScheduleConfig() + c.Assert(err, IsNil) tc := newTestClusterInfo(opt) hbStreams := schedule.NewMockHeartbeatStreams(tc.clusterInfo.getClusterID()) @@ -694,8 +713,8 @@ func (s *testOperatorControllerSuite) TestOperatorCount(c *C) { c.Assert(oc.OperatorCount(schedule.OpLeader), Equals, uint64(0)) c.Assert(oc.OperatorCount(schedule.OpRegion), Equals, uint64(0)) - tc.addLeaderRegion(1, 1) - tc.addLeaderRegion(2, 2) + c.Assert(tc.addLeaderRegion(1, 1), IsNil) + c.Assert(tc.addLeaderRegion(2, 2), IsNil) op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), schedule.OpLeader) oc.AddOperator(op1) c.Assert(oc.OperatorCount(schedule.OpLeader), Equals, uint64(1)) // 1:leader @@ -733,13 +752,14 @@ func (s *mockLimitScheduler) IsScheduleAllowed(cluster schedule.Cluster) bool { } func (s *testScheduleControllerSuite) TestController(c *C) { - _, opt := newTestScheduleConfig() + _, opt, err := newTestScheduleConfig() + c.Assert(err, IsNil) tc := newTestClusterInfo(opt) hbStreams := newHeartbeatStreams(tc.getClusterID()) defer hbStreams.Close() - tc.addLeaderRegion(1, 1) - tc.addLeaderRegion(2, 2) + c.Assert(tc.addLeaderRegion(1, 1), IsNil) + c.Assert(tc.addLeaderRegion(2, 2), IsNil) co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) oc := co.opController @@ -810,7 +830,8 @@ func (s *testScheduleControllerSuite) TestController(c *C) { } func (s *testScheduleControllerSuite) TestInterval(c *C) { - _, opt := newTestScheduleConfig() + _, opt, err := newTestScheduleConfig() + c.Assert(err, IsNil) tc := newTestClusterInfo(opt) hbStreams := newHeartbeatStreams(tc.getClusterID()) defer hbStreams.Close() diff --git a/server/core/kv.go b/server/core/kv.go index 68bb22ec0e4..00df7bd311b 100644 --- a/server/core/kv.go +++ b/server/core/kv.go @@ -235,7 +235,7 @@ func (kv *KV) loadFloatWithDefaultValue(path string, def float64) (float64, erro // Flush flushes the dirty region to storage. func (kv *KV) Flush() error { if kv.regionKV != nil { - kv.regionKV.FlushRegion() + return kv.regionKV.FlushRegion() } return nil } diff --git a/server/heartbeat_stream_test.go b/server/heartbeat_stream_test.go index bc14230ae87..c517cee3c11 100644 --- a/server/heartbeat_stream_test.go +++ b/server/heartbeat_stream_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/pd/pkg/testutil" "github.com/pingcap/pd/pkg/typeutil" + log "github.com/sirupsen/logrus" ) var _ = Suite(&testHeartbeatStreamSuite{}) @@ -31,29 +32,26 @@ type testHeartbeatStreamSuite struct { region *metapb.Region } -func (s *testHeartbeatStreamSuite) SetUpSuite(c *C) { +func (s *testHeartbeatStreamSuite) TestActivity(c *C) { var err error - _, s.svr, s.cleanup, err = NewTestServer() + var cleanup func() + _, s.svr, cleanup, err = NewTestServer(c) c.Assert(err, IsNil) s.svr.cfg.heartbeatStreamBindInterval = typeutil.NewDuration(time.Second) mustWaitLeader(c, []*Server{s.svr}) s.grpcPDClient = mustNewGrpcClient(c, s.svr.GetAddr()) + defer cleanup() bootstrapReq := s.newBootstrapRequest(c, s.svr.clusterID, "127.0.0.1:0") _, err = s.svr.bootstrapCluster(bootstrapReq) c.Assert(err, IsNil) s.region = bootstrapReq.Region -} - -func (s *testHeartbeatStreamSuite) TearDownSuite(c *C) { - s.cleanup() -} -func (s *testHeartbeatStreamSuite) TestActivity(c *C) { // Add a new store and an addPeer operator. storeID, err := s.svr.idAlloc.Alloc() c.Assert(err, IsNil) - putStore(c, s.grpcPDClient, s.svr.clusterID, &metapb.Store{Id: storeID, Address: "127.0.0.1:1"}) + _, err = putStore(c, s.grpcPDClient, s.svr.clusterID, &metapb.Store{Id: storeID, Address: "127.0.0.1:1"}) + c.Assert(err, IsNil) err = newHandler(s.svr).AddAddPeerOperator(s.region.GetId(), storeID) c.Assert(err, IsNil) @@ -77,14 +75,14 @@ func (s *testHeartbeatStreamSuite) TestActivity(c *C) { Region: s.region, } // Active stream is stream1. - stream1.stream.Send(req) + c.Assert(stream1.stream.Send(req), IsNil) c.Assert(checkActiveStream(), Equals, 1) // Rebind to stream2. - stream2.stream.Send(req) + c.Assert(stream2.stream.Send(req), IsNil) c.Assert(checkActiveStream(), Equals, 2) // Rebind to stream1 if no more heartbeats sent through stream2. testutil.WaitUntil(c, func(c *C) bool { - stream1.stream.Send(req) + c.Assert(stream1.stream.Send(req), IsNil) return checkActiveStream() == 1 }) } @@ -114,11 +112,15 @@ func newRegionheartbeatClient(c *C, grpcClient pdpb.PDClient) *regionHeartbeatCl } func (c *regionHeartbeatClient) close() { - c.stream.CloseSend() + if err := c.stream.CloseSend(); err != nil { + log.Errorf("Failed to terminate client stream: %v", err) + } } func (c *regionHeartbeatClient) SendRecv(msg *pdpb.RegionHeartbeatRequest, timeout time.Duration) *pdpb.RegionHeartbeatResponse { - c.stream.Send(msg) + if err := c.stream.Send(msg); err != nil { + log.Errorf("send heartbeat message fail: %v", err) + } select { case <-time.After(timeout): return nil diff --git a/server/id_test.go b/server/id_test.go index ae0904e797f..b2bbf3a1b4e 100644 --- a/server/id_test.go +++ b/server/id_test.go @@ -34,7 +34,7 @@ type testAllocIDSuite struct { func (s *testAllocIDSuite) SetUpSuite(c *C) { var err error - _, s.svr, s.cleanup, err = NewTestServer() + _, s.svr, s.cleanup, err = NewTestServer(c) c.Assert(err, IsNil) s.client = s.svr.client s.alloc = s.svr.idAlloc diff --git a/server/join_test.go b/server/join_test.go index 32a0fe36a07..dc47605dd81 100644 --- a/server/join_test.go +++ b/server/join_test.go @@ -23,7 +23,7 @@ type testJoinServerSuite struct{} // A PD joins itself. func (s *testJoinServerSuite) TestPDJoinsItself(c *C) { - cfg := NewTestSingleConfig() + cfg := NewTestSingleConfig(c) cfg.Join = cfg.AdvertiseClientUrls c.Assert(PrepareJoinCluster(cfg), NotNil) } diff --git a/server/leader_test.go b/server/leader_test.go index 5537b4141bc..af2c184c8aa 100644 --- a/server/leader_test.go +++ b/server/leader_test.go @@ -31,8 +31,7 @@ type testGetLeaderSuite struct { } func (s *testGetLeaderSuite) SetUpSuite(c *C) { - cfg := NewTestSingleConfig() - + cfg := NewTestSingleConfig(c) s.wg.Add(1) s.done = make(chan bool) svr, err := CreateServer(cfg, nil) @@ -77,7 +76,7 @@ func (s *testGetLeaderSuite) sendRequest(c *C, addr string) { // just make sure the server will not panic. grpcPDClient := mustNewGrpcClient(c, addr) if grpcPDClient != nil { - grpcPDClient.AllocID(context.Background(), req) + _, _ = grpcPDClient.AllocID(context.Background(), req) } } time.Sleep(10 * time.Millisecond) diff --git a/server/namespace_test.go b/server/namespace_test.go index b8207c3883e..d29d5a1d438 100644 --- a/server/namespace_test.go +++ b/server/namespace_test.go @@ -31,8 +31,10 @@ type testNamespaceSuite struct { } func (s *testNamespaceSuite) SetUpTest(c *C) { + var err error s.classifier = newMapClassifer() - s.scheduleConfig, s.opt = newTestScheduleConfig() + s.scheduleConfig, s.opt, err = newTestScheduleConfig() + c.Assert(err, IsNil) s.tc = newTestClusterInfo(s.opt) } @@ -41,9 +43,9 @@ func (s *testNamespaceSuite) TestReplica(c *C) { // 1 0 ns1 // 2 10 ns1 // 3 0 ns2 - s.tc.addRegionStore(1, 0) - s.tc.addRegionStore(2, 10) - s.tc.addRegionStore(3, 0) + c.Assert(s.tc.addRegionStore(1, 0), IsNil) + c.Assert(s.tc.addRegionStore(2, 10), IsNil) + c.Assert(s.tc.addRegionStore(3, 0), IsNil) s.classifier.setStore(1, "ns1") s.classifier.setStore(2, "ns1") s.classifier.setStore(3, "ns2") @@ -52,15 +54,15 @@ func (s *testNamespaceSuite) TestReplica(c *C) { // Replica should be added to the store with the same namespace. s.classifier.setRegion(1, "ns1") - s.tc.addLeaderRegion(1, 1) + c.Assert(s.tc.addLeaderRegion(1, 1), IsNil) op := checker.Check(s.tc.GetRegion(1)) testutil.CheckAddPeer(c, op, schedule.OpReplica, 2) - s.tc.addLeaderRegion(1, 3) + c.Assert(s.tc.addLeaderRegion(1, 3), IsNil) op = checker.Check(s.tc.GetRegion(1)) testutil.CheckAddPeer(c, op, schedule.OpReplica, 1) // Stop adding replica if no store in the same namespace. - s.tc.addLeaderRegion(1, 1, 2) + c.Assert(s.tc.addLeaderRegion(1, 1, 2), IsNil) op = checker.Check(s.tc.GetRegion(1)) c.Assert(op, IsNil) } @@ -70,9 +72,9 @@ func (s *testNamespaceSuite) TestNamespaceChecker(c *C) { // 1 0 ns1 // 2 10 ns1 // 3 0 ns2 - s.tc.addRegionStore(1, 0) - s.tc.addRegionStore(2, 10) - s.tc.addRegionStore(3, 0) + c.Assert(s.tc.addRegionStore(1, 0), IsNil) + c.Assert(s.tc.addRegionStore(2, 10), IsNil) + c.Assert(s.tc.addRegionStore(3, 0), IsNil) s.classifier.setStore(1, "ns1") s.classifier.setStore(2, "ns1") s.classifier.setStore(3, "ns2") @@ -81,15 +83,15 @@ func (s *testNamespaceSuite) TestNamespaceChecker(c *C) { // Move the region if it was not in the right store. s.classifier.setRegion(1, "ns2") - s.tc.addLeaderRegion(1, 1) + c.Assert(s.tc.addLeaderRegion(1, 1), IsNil) op := checker.Check(s.tc.GetRegion(1)) testutil.CheckTransferPeer(c, op, schedule.OpReplica, 1, 3) // Only move one region if the one was in the right store while the other was not. s.classifier.setRegion(2, "ns1") - s.tc.addLeaderRegion(2, 1) + c.Assert(s.tc.addLeaderRegion(2, 1), IsNil) s.classifier.setRegion(3, "ns2") - s.tc.addLeaderRegion(3, 2) + c.Assert(s.tc.addLeaderRegion(3, 2), IsNil) op = checker.Check(s.tc.GetRegion(2)) c.Assert(op, IsNil) op = checker.Check(s.tc.GetRegion(3)) @@ -97,13 +99,13 @@ func (s *testNamespaceSuite) TestNamespaceChecker(c *C) { // Do NOT move the region if it was in the right store. s.classifier.setRegion(4, "ns2") - s.tc.addLeaderRegion(4, 3) + c.Assert(s.tc.addLeaderRegion(4, 3), IsNil) op = checker.Check(s.tc.GetRegion(4)) c.Assert(op, IsNil) // Move the peer with questions to the right store if the region has multiple peers. s.classifier.setRegion(5, "ns1") - s.tc.addLeaderRegion(5, 1, 1, 3) + c.Assert(s.tc.addLeaderRegion(5, 1, 1, 3), IsNil) s.scheduleConfig.DisableNamespaceRelocation = true c.Assert(checker.Check(s.tc.GetRegion(5)), IsNil) @@ -118,9 +120,9 @@ func (s *testNamespaceSuite) TestSchedulerBalanceRegion(c *C) { // 1 0 ns1 // 2 100 ns1 // 3 200 ns2 - s.tc.addRegionStore(1, 0) - s.tc.addRegionStore(2, 100) - s.tc.addRegionStore(3, 200) + c.Assert(s.tc.addRegionStore(1, 0), IsNil) + c.Assert(s.tc.addRegionStore(2, 100), IsNil) + c.Assert(s.tc.addRegionStore(3, 200), IsNil) s.classifier.setStore(1, "ns1") s.classifier.setStore(2, "ns1") s.classifier.setStore(3, "ns2") @@ -130,13 +132,13 @@ func (s *testNamespaceSuite) TestSchedulerBalanceRegion(c *C) { sched, _ := schedule.CreateScheduler("balance-region", oc) // Balance is limited within a namespace. - s.tc.addLeaderRegion(1, 2) + c.Assert(s.tc.addLeaderRegion(1, 2), IsNil) s.classifier.setRegion(1, "ns1") op := scheduleByNamespace(s.tc, s.classifier, sched) testutil.CheckTransferPeer(c, op[0], schedule.OpBalance, 2, 1) // If no more store in the namespace, balance stops. - s.tc.addLeaderRegion(1, 3) + c.Assert(s.tc.addLeaderRegion(1, 3), IsNil) s.classifier.setRegion(1, "ns2") op = scheduleByNamespace(s.tc, s.classifier, sched) c.Assert(op, IsNil) @@ -144,9 +146,9 @@ func (s *testNamespaceSuite) TestSchedulerBalanceRegion(c *C) { // If region is not in the correct namespace, it will not be balanced. The // region should be in 'ns1', but its replica is located in 'ns2', neither // namespace will select it for balance. - s.tc.addRegionStore(4, 0) + c.Assert(s.tc.addRegionStore(4, 0), IsNil) s.classifier.setStore(4, "ns2") - s.tc.addLeaderRegion(1, 3) + c.Assert(s.tc.addLeaderRegion(1, 3), IsNil) s.classifier.setRegion(1, "ns1") op = scheduleByNamespace(s.tc, s.classifier, sched) c.Assert(op, IsNil) @@ -158,10 +160,10 @@ func (s *testNamespaceSuite) TestSchedulerBalanceLeader(c *C) { // 2 200 ns1 // 3 0 ns2 // 4 300 ns2 - s.tc.addLeaderStore(1, 100) - s.tc.addLeaderStore(2, 200) - s.tc.addLeaderStore(3, 0) - s.tc.addLeaderStore(4, 300) + c.Assert(s.tc.addLeaderStore(1, 100), IsNil) + c.Assert(s.tc.addLeaderStore(2, 200), IsNil) + c.Assert(s.tc.addLeaderStore(3, 0), IsNil) + c.Assert(s.tc.addLeaderStore(4, 300), IsNil) s.classifier.setStore(1, "ns1") s.classifier.setStore(2, "ns1") s.classifier.setStore(3, "ns2") @@ -171,13 +173,13 @@ func (s *testNamespaceSuite) TestSchedulerBalanceLeader(c *C) { sched, _ := schedule.CreateScheduler("balance-leader", oc) // Balance is limited within a namespace. - s.tc.addLeaderRegion(1, 2, 1) + c.Assert(s.tc.addLeaderRegion(1, 2, 1), IsNil) s.classifier.setRegion(1, "ns1") op := scheduleByNamespace(s.tc, s.classifier, sched) testutil.CheckTransferLeader(c, op[0], schedule.OpBalance, 2, 1) // If region is not in the correct namespace, it will not be balanced. - s.tc.addLeaderRegion(1, 4, 1) + c.Assert(s.tc.addLeaderRegion(1, 4, 1), IsNil) s.classifier.setRegion(1, "ns1") op = scheduleByNamespace(s.tc, s.classifier, sched) c.Assert(op, IsNil) diff --git a/server/option.go b/server/option.go index 665ca009387..98f83b84b2b 100644 --- a/server/option.go +++ b/server/option.go @@ -176,7 +176,7 @@ func (o *scheduleOption) GetSchedulers() SchedulerConfigs { return o.load().Schedulers } -func (o *scheduleOption) AddSchedulerCfg(tp string, args []string) error { +func (o *scheduleOption) AddSchedulerCfg(tp string, args []string) { c := o.load() v := c.clone() for i, schedulerCfg := range v.Schedulers { @@ -184,19 +184,18 @@ func (o *scheduleOption) AddSchedulerCfg(tp string, args []string) error { // such as two schedulers of type "evict-leader", // one name is "evict-leader-scheduler-1" and the other is "evict-leader-scheduler-2" if reflect.DeepEqual(schedulerCfg, SchedulerConfig{Type: tp, Args: args, Disable: false}) { - return nil + return } if reflect.DeepEqual(schedulerCfg, SchedulerConfig{Type: tp, Args: args, Disable: true}) { schedulerCfg.Disable = false v.Schedulers[i] = schedulerCfg o.store(v) - return nil + return } } v.Schedulers = append(v.Schedulers, SchedulerConfig{Type: tp, Args: args, Disable: false}) o.store(v) - return nil } func (o *scheduleOption) RemoveSchedulerCfg(name string) error { diff --git a/server/region_statistics_test.go b/server/region_statistics_test.go index f8bbb3e3103..035d566d556 100644 --- a/server/region_statistics_test.go +++ b/server/region_statistics_test.go @@ -66,7 +66,8 @@ var _ = Suite(&testRegionStatisticsSuite{}) type testRegionStatisticsSuite struct{} func (t *testRegionStatisticsSuite) TestRegionStatistics(c *C) { - _, opt := newTestScheduleConfig() + _, opt, err := newTestScheduleConfig() + c.Assert(err, IsNil) peers := []*metapb.Peer{ {Id: 5, StoreId: 1}, {Id: 6, StoreId: 2}, diff --git a/server/region_syncer/client.go b/server/region_syncer/client.go index 1e1cdd4a419..2de1b6c3496 100644 --- a/server/region_syncer/client.go +++ b/server/region_syncer/client.go @@ -112,7 +112,9 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { resp, err := client.Recv() if err != nil { log.Error("region sync with leader meet error:", err) - client.CloseSend() + if err = client.CloseSend(); err != nil { + log.Errorf("Failed to terminate client stream: %v", err) + } time.Sleep(time.Second) break } diff --git a/server/region_syncer/server.go b/server/region_syncer/server.go index a9bd74f5856..f071c21104f 100644 --- a/server/region_syncer/server.go +++ b/server/region_syncer/server.go @@ -177,7 +177,9 @@ func (s *RegionSyncer) syncHistoryRegion(request *pdpb.SyncRegionRequest, stream } s.limit.Wait(int64(resp.Size())) lastIndex += len(res) - stream.Send(resp) + if err := stream.Send(resp); err != nil { + log.Errorf("failed to send sync region response, error: %v", err) + } res = res[:0] } log.Infof("%s has completed full synchronization with %s, spend %v", name, s.server.Name(), time.Since(start)) diff --git a/server/schedule/basic_cluster.go b/server/schedule/basic_cluster.go index 66112d8f9ff..8c202c59758 100644 --- a/server/schedule/basic_cluster.go +++ b/server/schedule/basic_cluster.go @@ -138,15 +138,13 @@ func (bc *BasicCluster) RegionReadStats() []*core.RegionStat { } // PutStore put a store -func (bc *BasicCluster) PutStore(store *core.StoreInfo) error { +func (bc *BasicCluster) PutStore(store *core.StoreInfo) { bc.Stores.SetStore(store) - return nil } // PutRegion put a region -func (bc *BasicCluster) PutRegion(region *core.RegionInfo) error { +func (bc *BasicCluster) PutRegion(region *core.RegionInfo) { bc.Regions.SetRegion(region) - return nil } // CheckWriteStatus checks the write status, returns whether need update statistics and item. diff --git a/server/schedulers/balance_test.go b/server/schedulers/balance_test.go index 242b2c27f5c..63923e8750f 100644 --- a/server/schedulers/balance_test.go +++ b/server/schedulers/balance_test.go @@ -913,7 +913,7 @@ func (s *testMergeCheckerSuite) SetUpTest(c *C) { } for _, region := range s.regions { - c.Assert(s.cluster.PutRegion(region), IsNil) + s.cluster.PutRegion(region) } s.mc = schedule.NewMergeChecker(s.cluster, namespace.DefaultClassifier) @@ -1278,7 +1278,8 @@ func (s *testScatterRangeLeaderSuite) TestBalance(c *C) { tc.Regions.SetRegion(regionInfo) } for i := 0; i < 100; i++ { - tc.AllocPeer(1) + _, err := tc.AllocPeer(1) + c.Assert(err, IsNil) } for i := 1; i <= 5; i++ { tc.UpdateStoreStatus(uint64(i)) diff --git a/server/server.go b/server/server.go index 701cba49fa2..6f957134a57 100644 --- a/server/server.go +++ b/server/server.go @@ -589,27 +589,35 @@ func (s *Server) GetNamespaceConfigWithAdjust(name string) *NamespaceConfig { } // SetNamespaceConfig sets the namespace config. -func (s *Server) SetNamespaceConfig(name string, cfg NamespaceConfig) { +func (s *Server) SetNamespaceConfig(name string, cfg NamespaceConfig) error { if n, ok := s.scheduleOpt.ns[name]; ok { old := s.scheduleOpt.ns[name].load() n.store(&cfg) - s.scheduleOpt.persist(s.kv) + if err := s.scheduleOpt.persist(s.kv); err != nil { + return err + } log.Infof("namespace:%v config is updated: %+v, old: %+v", name, cfg, old) } else { s.scheduleOpt.ns[name] = newNamespaceOption(&cfg) - s.scheduleOpt.persist(s.kv) + if err := s.scheduleOpt.persist(s.kv); err != nil { + return err + } log.Infof("namespace:%v config is added: %+v", name, cfg) } + return nil } // DeleteNamespaceConfig deletes the namespace config. -func (s *Server) DeleteNamespaceConfig(name string) { +func (s *Server) DeleteNamespaceConfig(name string) error { if n, ok := s.scheduleOpt.ns[name]; ok { cfg := n.load() delete(s.scheduleOpt.ns, name) - s.scheduleOpt.persist(s.kv) + if err := s.scheduleOpt.persist(s.kv); err != nil { + return err + } log.Infof("namespace:%v config is deleted: %+v", name, *cfg) } + return nil } // SetLabelProperty inserts a label property config. diff --git a/server/server_test.go b/server/server_test.go index 8b8c14167d2..8378e7c8f0b 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -29,7 +29,7 @@ func TestServer(t *testing.T) { func mustRunTestServer(c *C) (*Server, CleanupFunc) { var err error - _, server, cleanup, err := NewTestServer() + _, server, cleanup, err := NewTestServer(c) c.Assert(err, IsNil) mustWaitLeader(c, []*Server{server}) return server, cleanup @@ -59,7 +59,7 @@ type testLeaderServerSuite struct { func (s *testLeaderServerSuite) SetUpSuite(c *C) { s.svrs = make(map[string]*Server) - cfgs := NewTestMultiConfig(3) + cfgs := NewTestMultiConfig(c, 3) ch := make(chan *Server, 3) for i := 0; i < 3; i++ { @@ -124,7 +124,7 @@ func newTestServersWithCfgs(c *C, cfgs []*Config) ([]*Server, CleanupFunc) { } func (s *testServerSuite) TestCheckClusterID(c *C) { - cfgs := NewTestMultiConfig(2) + cfgs := NewTestMultiConfig(c, 2) for i, cfg := range cfgs { cfg.DataDir = fmt.Sprintf("/tmp/test_pd_check_clusterID_%d", i) // Clean up before testing. diff --git a/server/store_statistics_test.go b/server/store_statistics_test.go index 0a738e12459..be7987d61fa 100644 --- a/server/store_statistics_test.go +++ b/server/store_statistics_test.go @@ -27,7 +27,8 @@ var _ = Suite(&testStoreStatisticsSuite{}) type testStoreStatisticsSuite struct{} func (t *testStoreStatisticsSuite) TestStoreStatistics(c *C) { - _, opt := newTestScheduleConfig() + _, opt, err := newTestScheduleConfig() + c.Assert(err, IsNil) rep := opt.GetReplication().load() rep.LocationLabels = []string{"zone", "host"} diff --git a/server/testutil.go b/server/testutil.go index 4d9e9e25054..b72209a22fa 100644 --- a/server/testutil.go +++ b/server/testutil.go @@ -22,6 +22,7 @@ import ( "time" "github.com/coreos/etcd/embed" + "github.com/pingcap/check" "github.com/pingcap/pd/pkg/tempurl" "github.com/pingcap/pd/pkg/typeutil" @@ -38,8 +39,8 @@ func cleanServer(cfg *Config) { } // NewTestServer creates a pd server for testing. -func NewTestServer() (*Config, *Server, CleanupFunc, error) { - cfg := NewTestSingleConfig() +func NewTestServer(c *check.C) (*Config, *Server, CleanupFunc, error) { + cfg := NewTestSingleConfig(c) s, err := CreateServer(cfg, nil) if err != nil { return nil, nil, nil, err @@ -57,7 +58,7 @@ func NewTestServer() (*Config, *Server, CleanupFunc, error) { // NewTestSingleConfig is only for test to create one pd. // Because PD client also needs this, so export here. -func NewTestSingleConfig() *Config { +func NewTestSingleConfig(c *check.C) *Config { cfg := &Config{ Name: "pd", ClientUrls: tempurl.Alloc(), @@ -78,19 +79,19 @@ func NewTestSingleConfig() *Config { cfg.ElectionInterval = typeutil.NewDuration(3 * time.Second) cfg.LeaderPriorityCheckInterval = typeutil.NewDuration(100 * time.Millisecond) - cfg.Adjust(nil) + c.Assert(cfg.Adjust(nil), check.IsNil) return cfg } // NewTestMultiConfig is only for test to create multiple pd configurations. // Because PD client also needs this, so export here. -func NewTestMultiConfig(count int) []*Config { +func NewTestMultiConfig(c *check.C, count int) []*Config { cfgs := make([]*Config, count) clusters := []string{} for i := 1; i <= count; i++ { - cfg := NewTestSingleConfig() + cfg := NewTestSingleConfig(c) cfg.Name = fmt.Sprintf("pd%d", i) clusters = append(clusters, fmt.Sprintf("%s=%s", cfg.Name, cfg.PeerUrls)) diff --git a/table/namespace_classifier_test.go b/table/namespace_classifier_test.go index 049f23046f7..121cb7eb437 100644 --- a/table/namespace_classifier_test.go +++ b/table/namespace_classifier_test.go @@ -66,8 +66,8 @@ func (s *testTableNamespaceSuite) newClassifier(c *C) *tableNamespaceClassifier Meta: true, } - tableClassifier.putNamespaceLocked(&testNamespace1) - tableClassifier.putNamespaceLocked(&testNamespace2) + c.Assert(tableClassifier.putNamespaceLocked(&testNamespace1), IsNil) + c.Assert(tableClassifier.putNamespaceLocked(&testNamespace2), IsNil) return tableClassifier } diff --git a/tests/cluster.go b/tests/cluster.go index 37191c31d9e..57eceba3e38 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/pd/server/api" "github.com/pingcap/pd/server/core" "github.com/pkg/errors" + log "github.com/sirupsen/logrus" ) // TestServer states. @@ -101,7 +102,9 @@ func (s *TestServer) Destroy() error { if s.state == Running { s.server.Close() } - os.RemoveAll(s.server.GetConfig().DataDir) + if err := os.RemoveAll(s.server.GetConfig().DataDir); err != nil { + return err + } s.state = Destroy return nil } @@ -423,12 +426,11 @@ func (c *TestCluster) Join() (*TestServer, error) { } // Destroy is used to destroy a TestCluster. -func (c *TestCluster) Destroy() error { +func (c *TestCluster) Destroy() { for _, s := range c.servers { err := s.Destroy() if err != nil { - return err + log.Errorf("failed to destroy the cluster: %v", err) } } - return nil } diff --git a/tests/cmd/pdctl_test.go b/tests/cmd/pdctl_test.go index 9b7345877bc..3e7abb6c2b6 100644 --- a/tests/cmd/pdctl_test.go +++ b/tests/cmd/pdctl_test.go @@ -471,7 +471,7 @@ func (s *cmdTestSuite) TestRegion(c *C) { _, output, err := executeCommandC(cmd, args...) c.Assert(err, IsNil) regionsInfo := api.RegionsInfo{} - json.Unmarshal(output, ®ionsInfo) + c.Assert(json.Unmarshal(output, ®ionsInfo), IsNil) regions := leaderServer.GetRegions() checkRegionsInfo(c, regionsInfo, regions) @@ -480,7 +480,7 @@ func (s *cmdTestSuite) TestRegion(c *C) { _, output, err = executeCommandC(cmd, args...) c.Assert(err, IsNil) regionInfo := api.RegionInfo{} - json.Unmarshal(output, ®ionInfo) + c.Assert(json.Unmarshal(output, ®ionInfo), IsNil) region := leaderServer.GetRegionInfoByID(1) c.Assert(api.NewRegionInfo(region), DeepEquals, ®ionInfo) @@ -489,7 +489,7 @@ func (s *cmdTestSuite) TestRegion(c *C) { _, output, err = executeCommandC(cmd, args...) c.Assert(err, IsNil) regionsInfo = api.RegionsInfo{} - json.Unmarshal(output, ®ionsInfo) + c.Assert(json.Unmarshal(output, ®ionsInfo), IsNil) region = leaderServer.GetRegionInfoByID(2) regions = leaderServer.GetAdjacentRegions(region) checkRegionsInfo(c, regionsInfo, regions) @@ -499,7 +499,7 @@ func (s *cmdTestSuite) TestRegion(c *C) { _, output, err = executeCommandC(cmd, args...) c.Assert(err, IsNil) regionsInfo = api.RegionsInfo{} - json.Unmarshal(output, ®ionsInfo) + c.Assert(json.Unmarshal(output, ®ionsInfo), IsNil) regions = leaderServer.GetStoreRegions(1) checkRegionsInfo(c, regionsInfo, regions) @@ -508,7 +508,7 @@ func (s *cmdTestSuite) TestRegion(c *C) { _, output, err = executeCommandC(cmd, args...) c.Assert(err, IsNil) regionsInfo = api.RegionsInfo{} - json.Unmarshal(output, ®ionsInfo) + c.Assert(json.Unmarshal(output, ®ionsInfo), IsNil) regions = api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetBytesRead() < b.GetBytesRead() }, 2) checkRegionsInfo(c, regionsInfo, regions) @@ -517,7 +517,7 @@ func (s *cmdTestSuite) TestRegion(c *C) { _, output, err = executeCommandC(cmd, args...) c.Assert(err, IsNil) regionsInfo = api.RegionsInfo{} - json.Unmarshal(output, ®ionsInfo) + c.Assert(json.Unmarshal(output, ®ionsInfo), IsNil) regions = api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetBytesWritten() < b.GetBytesWritten() }, 2) checkRegionsInfo(c, regionsInfo, regions) @@ -526,7 +526,7 @@ func (s *cmdTestSuite) TestRegion(c *C) { _, output, err = executeCommandC(cmd, args...) c.Assert(err, IsNil) regionsInfo = api.RegionsInfo{} - json.Unmarshal(output, ®ionsInfo) + c.Assert(json.Unmarshal(output, ®ionsInfo), IsNil) regions = api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetMeta().GetRegionEpoch().GetConfVer() < b.GetMeta().GetRegionEpoch().GetConfVer() }, 2) @@ -537,7 +537,7 @@ func (s *cmdTestSuite) TestRegion(c *C) { _, output, err = executeCommandC(cmd, args...) c.Assert(err, IsNil) regionsInfo = api.RegionsInfo{} - json.Unmarshal(output, ®ionsInfo) + c.Assert(json.Unmarshal(output, ®ionsInfo), IsNil) regions = api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetMeta().GetRegionEpoch().GetVersion() < b.GetMeta().GetRegionEpoch().GetVersion() }, 2) @@ -548,7 +548,7 @@ func (s *cmdTestSuite) TestRegion(c *C) { _, output, err = executeCommandC(cmd, args...) c.Assert(err, IsNil) regionsInfo = api.RegionsInfo{} - json.Unmarshal(output, ®ionsInfo) + c.Assert(json.Unmarshal(output, ®ionsInfo), IsNil) regions = api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetApproximateSize() < b.GetApproximateSize() }, 2) @@ -559,7 +559,7 @@ func (s *cmdTestSuite) TestRegion(c *C) { _, output, err = executeCommandC(cmd, args...) c.Assert(err, IsNil) regionsInfo = api.RegionsInfo{} - json.Unmarshal(output, ®ionsInfo) + c.Assert(json.Unmarshal(output, ®ionsInfo), IsNil) checkRegionsInfo(c, regionsInfo, []*core.RegionInfo{r1}) // region check miss-peer command @@ -567,7 +567,7 @@ func (s *cmdTestSuite) TestRegion(c *C) { _, output, err = executeCommandC(cmd, args...) c.Assert(err, IsNil) regionsInfo = api.RegionsInfo{} - json.Unmarshal(output, ®ionsInfo) + c.Assert(json.Unmarshal(output, ®ionsInfo), IsNil) checkRegionsInfo(c, regionsInfo, []*core.RegionInfo{r2, r3, r4}) // region check pending-peer command @@ -575,7 +575,7 @@ func (s *cmdTestSuite) TestRegion(c *C) { _, output, err = executeCommandC(cmd, args...) c.Assert(err, IsNil) regionsInfo = api.RegionsInfo{} - json.Unmarshal(output, ®ionsInfo) + c.Assert(json.Unmarshal(output, ®ionsInfo), IsNil) checkRegionsInfo(c, regionsInfo, []*core.RegionInfo{r3}) // region check down-peer command @@ -583,7 +583,7 @@ func (s *cmdTestSuite) TestRegion(c *C) { _, output, err = executeCommandC(cmd, args...) c.Assert(err, IsNil) regionsInfo = api.RegionsInfo{} - json.Unmarshal(output, ®ionsInfo) + c.Assert(json.Unmarshal(output, ®ionsInfo), IsNil) checkRegionsInfo(c, regionsInfo, []*core.RegionInfo{r3}) // region key --format=raw command @@ -591,7 +591,7 @@ func (s *cmdTestSuite) TestRegion(c *C) { _, output, err = executeCommandC(cmd, args...) c.Assert(err, IsNil) regionInfo = api.RegionInfo{} - json.Unmarshal(output, ®ionInfo) + c.Assert(json.Unmarshal(output, ®ionInfo), IsNil) c.Assert(®ionInfo, DeepEquals, api.NewRegionInfo(r2)) // region key --format=hex command @@ -599,7 +599,7 @@ func (s *cmdTestSuite) TestRegion(c *C) { _, output, err = executeCommandC(cmd, args...) c.Assert(err, IsNil) regionInfo = api.RegionInfo{} - json.Unmarshal(output, ®ionInfo) + c.Assert(json.Unmarshal(output, ®ionInfo), IsNil) c.Assert(®ionInfo, DeepEquals, api.NewRegionInfo(r2)) // region startkey --format=raw command @@ -607,7 +607,7 @@ func (s *cmdTestSuite) TestRegion(c *C) { _, output, err = executeCommandC(cmd, args...) c.Assert(err, IsNil) regionsInfo = api.RegionsInfo{} - json.Unmarshal(output, ®ionsInfo) + c.Assert(json.Unmarshal(output, ®ionsInfo), IsNil) checkRegionsInfo(c, regionsInfo, []*core.RegionInfo{r2, r3}) // region startkey --format=hex command @@ -615,7 +615,7 @@ func (s *cmdTestSuite) TestRegion(c *C) { _, output, err = executeCommandC(cmd, args...) c.Assert(err, IsNil) regionsInfo = api.RegionsInfo{} - json.Unmarshal(output, ®ionsInfo) + c.Assert(json.Unmarshal(output, ®ionsInfo), IsNil) checkRegionsInfo(c, regionsInfo, []*core.RegionInfo{r3, r4}) } @@ -645,7 +645,7 @@ func (s *cmdTestSuite) TestConfig(c *C) { _, output, err := executeCommandC(cmd, args...) c.Assert(err, IsNil) scheduleCfg := server.ScheduleConfig{} - json.Unmarshal(output, &scheduleCfg) + c.Assert(json.Unmarshal(output, &scheduleCfg), IsNil) c.Assert(&scheduleCfg, DeepEquals, svr.GetScheduleConfig()) // config show replication @@ -653,7 +653,7 @@ func (s *cmdTestSuite) TestConfig(c *C) { _, output, err = executeCommandC(cmd, args...) c.Assert(err, IsNil) replicationCfg := server.ReplicationConfig{} - json.Unmarshal(output, &replicationCfg) + c.Assert(json.Unmarshal(output, &replicationCfg), IsNil) c.Assert(&replicationCfg, DeepEquals, svr.GetReplicationConfig()) // config show cluster-version @@ -661,7 +661,7 @@ func (s *cmdTestSuite) TestConfig(c *C) { _, output, err = executeCommandC(cmd, args1...) c.Assert(err, IsNil) clusterVersion := semver.Version{} - json.Unmarshal(output, &clusterVersion) + c.Assert(json.Unmarshal(output, &clusterVersion), IsNil) c.Assert(clusterVersion, DeepEquals, svr.GetClusterVersion()) // config set cluster-version @@ -672,7 +672,7 @@ func (s *cmdTestSuite) TestConfig(c *C) { _, output, err = executeCommandC(cmd, args1...) c.Assert(err, IsNil) clusterVersion = semver.Version{} - json.Unmarshal(output, &clusterVersion) + c.Assert(json.Unmarshal(output, &clusterVersion), IsNil) c.Assert(clusterVersion, DeepEquals, svr.GetClusterVersion()) // config show namespace && config set namespace @@ -686,7 +686,7 @@ func (s *cmdTestSuite) TestConfig(c *C) { _, output, err = executeCommandC(cmd, args1...) c.Assert(err, IsNil) namespaceCfg := server.NamespaceConfig{} - json.Unmarshal(output, &namespaceCfg) + c.Assert(json.Unmarshal(output, &namespaceCfg), IsNil) args2 = []string{"-u", pdAddr, "config", "set", "namespace", "ts1", "region-schedule-limit", "128"} _, _, err = executeCommandC(cmd, args2...) c.Assert(err, IsNil) @@ -694,7 +694,7 @@ func (s *cmdTestSuite) TestConfig(c *C) { _, output, err = executeCommandC(cmd, args1...) c.Assert(err, IsNil) namespaceCfg = server.NamespaceConfig{} - json.Unmarshal(output, &namespaceCfg) + c.Assert(json.Unmarshal(output, &namespaceCfg), IsNil) c.Assert(namespaceCfg.RegionScheduleLimit, Equals, svr.GetNamespaceConfig("ts1").RegionScheduleLimit) // config delete namespace @@ -704,7 +704,7 @@ func (s *cmdTestSuite) TestConfig(c *C) { _, output, err = executeCommandC(cmd, args1...) c.Assert(err, IsNil) namespaceCfg = server.NamespaceConfig{} - json.Unmarshal(output, &namespaceCfg) + c.Assert(json.Unmarshal(output, &namespaceCfg), IsNil) c.Assert(namespaceCfg.RegionScheduleLimit, Not(Equals), svr.GetNamespaceConfig("ts1").RegionScheduleLimit) // config show label-property @@ -712,7 +712,7 @@ func (s *cmdTestSuite) TestConfig(c *C) { _, output, err = executeCommandC(cmd, args1...) c.Assert(err, IsNil) labelPropertyCfg := server.LabelPropertyConfig{} - json.Unmarshal(output, &labelPropertyCfg) + c.Assert(json.Unmarshal(output, &labelPropertyCfg), IsNil) c.Assert(labelPropertyCfg, DeepEquals, svr.GetLabelProperty()) // config set label-property @@ -723,7 +723,7 @@ func (s *cmdTestSuite) TestConfig(c *C) { _, output, err = executeCommandC(cmd, args1...) c.Assert(err, IsNil) labelPropertyCfg = server.LabelPropertyConfig{} - json.Unmarshal(output, &labelPropertyCfg) + c.Assert(json.Unmarshal(output, &labelPropertyCfg), IsNil) c.Assert(labelPropertyCfg, DeepEquals, svr.GetLabelProperty()) // config delete label-property @@ -734,7 +734,7 @@ func (s *cmdTestSuite) TestConfig(c *C) { _, output, err = executeCommandC(cmd, args1...) c.Assert(err, IsNil) labelPropertyCfg = server.LabelPropertyConfig{} - json.Unmarshal(output, &labelPropertyCfg) + c.Assert(json.Unmarshal(output, &labelPropertyCfg), IsNil) c.Assert(labelPropertyCfg, DeepEquals, svr.GetLabelProperty()) // config set