diff --git a/CHANGELOG.md b/CHANGELOG.md index 940cdaa5f9c2..fca94927c585 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # PD Change Log +## v2.1.3 +- Fix the `Watch` issue about leader election [#1404](https://github.com/pingcap/pd/pull/1404) + +## v2.1.2 +- Fix the Region information update issue about Region merge [#1377](https://github.com/pingcap/pd/pull/1377) + ## v2.1.1 - Fix the issue that some configuration items cannot be set to `0` in the configuration file [#1334](https://github.com/pingcap/pd/pull/1334) - Check the undefined configuration when starting PD [#1362](https://github.com/pingcap/pd/pull/1362) diff --git a/pkg/integration_test/leader_watch_test.go b/pkg/integration_test/leader_watch_test.go index 350a4f124772..9c88c814de9a 100644 --- a/pkg/integration_test/leader_watch_test.go +++ b/pkg/integration_test/leader_watch_test.go @@ -25,7 +25,7 @@ import ( func (s *integrationTestSuite) TestWatcher(c *C) { c.Parallel() - cluster, err := newTestCluster(1) + cluster, err := newTestCluster(1, func(conf *server.Config) { conf.AutoCompactionRetention = "1s" }) c.Assert(err, IsNil) defer cluster.Destroy() diff --git a/server/cluster.go b/server/cluster.go index 9983a6f344ef..bd3ac987a7e4 100644 --- a/server/cluster.go +++ b/server/cluster.go @@ -21,7 +21,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" - "github.com/pingcap/pd/pkg/error_code" + errcode "github.com/pingcap/pd/pkg/error_code" "github.com/pingcap/pd/pkg/logutil" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/namespace" @@ -459,20 +459,21 @@ func (c *RaftCluster) checkStores() { cluster := c.cachedCluster for _, store := range cluster.GetStores() { - if store.GetState() != metapb.StoreState_Offline { - if store.GetState() == metapb.StoreState_Up && !store.IsLowSpace(cluster.GetLowSpaceRatio()) { - upStoreCount++ - continue - } + // the store has already been tombstone + if store.IsTombstone() { + continue + } + + if store.IsUp() && !store.IsLowSpace(cluster.GetLowSpaceRatio()) { + upStoreCount++ + continue } + offlineStore := store.Store // If the store is empty, it can be buried. if cluster.getStoreRegionCount(offlineStore.GetId()) == 0 { - err := c.BuryStore(offlineStore.GetId(), false) - if err != nil { + if err := c.BuryStore(offlineStore.GetId(), false); err != nil { log.Errorf("bury store %v failed: %v", offlineStore, err) - } else { - log.Infof("buried store %v", offlineStore) } } else { offlineStores = append(offlineStores, offlineStore) diff --git a/server/leader.go b/server/leader.go index 2f7e2b974e24..132c3f158b09 100644 --- a/server/leader.go +++ b/server/leader.go @@ -299,14 +299,17 @@ func (s *Server) watchLeader(leader *pdpb.Member, revision int64) { ctx, cancel := context.WithCancel(s.serverLoopCtx) defer cancel() + // The revision is the revision of last modification on this key. + // If the revision is compacted, will meet required revision has been compacted error. + // In this case, use the compact revision to re-watch the key. for { // gofail: var delayWatcher struct{} rch := watcher.Watch(ctx, s.getLeaderPath(), clientv3.WithRev(revision)) for wresp := range rch { - // meet compacted error, use current revision. + // meet compacted error, use the compact revision. if wresp.CompactRevision != 0 { - log.Warnf("required revision %d has been compacted, use current revision", revision) - revision = 0 + log.Warnf("required revision %d has been compacted, use the compact revision %d", revision, wresp.CompactRevision) + revision = wresp.CompactRevision break } if wresp.Canceled {