Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*:Rollback config in store when kv.persist failed #1476

Merged
merged 34 commits into from
May 22, 2019
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
b966728
Merge pull request #1 from pingcap/master
bradyjoestar Mar 20, 2019
b263e13
Merge pull request #2 from pingcap/master
bradyjoestar Mar 24, 2019
3af8fe3
clear store when persist failed
swbsin Mar 25, 2019
7c02023
log more info
swbsin Mar 25, 2019
bc77251
gofmt
swbsin Mar 25, 2019
c677fd2
fix error info
swbsin Mar 25, 2019
98f9c4b
Merge branch 'master' into issue-1475
bradyjoestar Mar 25, 2019
5abed59
Merge branch 'master' into issue-1475
bradyjoestar Mar 27, 2019
8e1ec51
Merge branch 'master' into issue-1475
bradyjoestar Mar 27, 2019
a77a952
add unit_test
swbsin Mar 27, 2019
3a758c3
try to debug jenkins error
swbsin Mar 27, 2019
237f5a5
try to fix jenkins bug
swbsin Mar 27, 2019
892d1b7
try to fix jenkins error
swbsin Mar 27, 2019
19a566b
fix data race bug
swbsin Mar 27, 2019
b5fbb9d
prolong time for region kv to flush
swbsin Mar 27, 2019
5548cdc
Update region_syncer_test.go
bradyjoestar Mar 27, 2019
ea2da65
better output log
swbsin Mar 29, 2019
2441a70
Merge branch 'master' into issue-1475
nolouch Apr 9, 2019
800ed08
Merge branch 'master' into issue-1475
bradyjoestar Apr 10, 2019
300ac33
convert 5s to 3s
bradyjoestar Apr 11, 2019
8a090f4
Merge branch 'issue-1475' of github.com:bradyjoestar/pd into issue-1475
bradyjoestar Apr 11, 2019
fbb7cc8
Merge branch 'master' into issue-1475
nolouch Apr 15, 2019
54e34c0
Merge branch 'master' into issue-1475
bradyjoestar Apr 17, 2019
28689dc
replace sync.map
bradyjoestar Apr 17, 2019
bd2754c
fix ci bug
bradyjoestar Apr 17, 2019
2f777cc
rebuild jenkins
bradyjoestar Apr 17, 2019
6ad3899
Merge branch 'master' into issue-1475
bradyjoestar Apr 18, 2019
51e9451
Merge branch 'master' into issue-1475
bradyjoestar Apr 19, 2019
21c62e4
Merge branch 'master' into issue-1475
nolouch Apr 25, 2019
4b64562
extract function
bradyjoestar May 21, 2019
0ec5424
go fmt
bradyjoestar May 21, 2019
4db8982
revert system_mon
bradyjoestar May 21, 2019
0446e27
Merge branch 'master' into issue-1475
bradyjoestar May 21, 2019
b710e24
Merge branch 'master' into issue-1475
nolouch May 22, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions server/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/pd/server/core"
"github.com/pkg/errors"
"google.golang.org/grpc"
)

Expand All @@ -44,6 +45,14 @@ type testClusterSuite struct {
baseCluster
}

type testErrorKV struct {
core.KVBase
}

func (kv *testErrorKV) Save(key, value string) error {
return errors.New("save failed")
}

func mustNewGrpcClient(c *C, addr string) pdpb.PDClient {
conn, err := grpc.Dial(strings.TrimPrefix(addr, "http://"), grpc.WithInsecure())

Expand Down Expand Up @@ -597,3 +606,84 @@ func (s *testGetStoresSuite) BenchmarkGetStores(c *C) {
s.cluster.core.Stores.GetStores()
}
}

func (s *testClusterSuite) TestSetScheduleOpt(c *C) {
var err error
var cleanup func()
_, s.svr, cleanup, err = NewTestServer(c)
c.Assert(err, IsNil)
mustWaitLeader(c, []*Server{s.svr})
s.grpcPDClient = mustNewGrpcClient(c, s.svr.GetAddr())
defer cleanup()
clusterID := s.svr.clusterID

storeAddr := "127.0.0.1:0"
_, err = s.svr.bootstrapCluster(s.newBootstrapRequest(c, clusterID, storeAddr))
c.Assert(err, IsNil)

_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)

scheduleCfg := opt.load()
replicateCfg := s.svr.GetReplicationConfig()
pdServerCfg := s.svr.scheduleOpt.loadPDServerConfig()

//PUT GET DELETE successed
replicateCfg.MaxReplicas = 5
scheduleCfg.MaxSnapshotCount = 10
pdServerCfg.UseRegionStorage = true
typ, labelKey, labelValue := "testTyp", "testKey", "testValue"
nsConfig := NamespaceConfig{LeaderScheduleLimit: uint64(200)}

c.Assert(s.svr.SetScheduleConfig(*scheduleCfg), IsNil)
c.Assert(s.svr.SetPDServerConfig(*pdServerCfg), IsNil)
c.Assert(s.svr.SetLabelProperty(typ, labelKey, labelValue), IsNil)
c.Assert(s.svr.SetNamespaceConfig("testNS", nsConfig), IsNil)
c.Assert(s.svr.SetReplicationConfig(*replicateCfg), IsNil)

c.Assert(s.svr.GetReplicationConfig().MaxReplicas, Equals, uint64(5))
c.Assert(s.svr.scheduleOpt.GetMaxSnapshotCount(), Equals, uint64(10))
c.Assert(s.svr.scheduleOpt.loadPDServerConfig().UseRegionStorage, Equals, true)
c.Assert(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ][0].Key, Equals, "testKey")
c.Assert(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ][0].Value, Equals, "testValue")
c.Assert(s.svr.GetNamespaceConfig("testNS").LeaderScheduleLimit, Equals, uint64(200))

c.Assert(s.svr.DeleteNamespaceConfig("testNS"), IsNil)
c.Assert(s.svr.DeleteLabelProperty(typ, labelKey, labelValue), IsNil)

c.Assert(s.svr.GetNamespaceConfig("testNS").LeaderScheduleLimit, Equals, uint64(0))
c.Assert(len(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ]), Equals, 0)

//PUT GET failed
oldKV := s.svr.kv
s.svr.kv = core.NewKV(&testErrorKV{})
replicateCfg.MaxReplicas = 7
scheduleCfg.MaxSnapshotCount = 20
pdServerCfg.UseRegionStorage = false

c.Assert(s.svr.SetScheduleConfig(*scheduleCfg), NotNil)
c.Assert(s.svr.SetReplicationConfig(*replicateCfg), NotNil)
c.Assert(s.svr.SetPDServerConfig(*pdServerCfg), NotNil)
c.Assert(s.svr.SetLabelProperty(typ, labelKey, labelValue), NotNil)
c.Assert(s.svr.SetNamespaceConfig("testNS", nsConfig), NotNil)

c.Assert(s.svr.GetReplicationConfig().MaxReplicas, Equals, uint64(5))
c.Assert(s.svr.scheduleOpt.GetMaxSnapshotCount(), Equals, uint64(10))
c.Assert(s.svr.scheduleOpt.loadPDServerConfig().UseRegionStorage, Equals, true)
c.Assert(s.svr.GetNamespaceConfig("testNS").LeaderScheduleLimit, Equals, uint64(0))
c.Assert(len(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ]), Equals, 0)

//DELETE failed
s.svr.kv = oldKV
c.Assert(s.svr.SetNamespaceConfig("testNS", nsConfig), IsNil)
c.Assert(s.svr.SetReplicationConfig(*replicateCfg), IsNil)

s.svr.kv = core.NewKV(&testErrorKV{})
c.Assert(s.svr.DeleteLabelProperty(typ, labelKey, labelValue), NotNil)
c.Assert(s.svr.GetNamespaceConfig("testNS").LeaderScheduleLimit, Equals, uint64(200))
c.Assert(s.svr.DeleteNamespaceConfig("testNS"), NotNil)

c.Assert(s.svr.GetNamespaceConfig("testNS").LeaderScheduleLimit, Equals, uint64(200))
c.Assert(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ][0].Key, Equals, "testKey")
c.Assert(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ][0].Value, Equals, "testValue")
}
61 changes: 43 additions & 18 deletions server/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package server

import (
"reflect"
"sync"
"sync/atomic"
"time"

Expand All @@ -28,7 +29,7 @@ import (
type scheduleOption struct {
v atomic.Value
rep *Replication
ns map[string]*namespaceOption
ns sync.Map // concurrent map[string]*namespaceOption
labelProperty atomic.Value
clusterVersion atomic.Value
pdServerConfig atomic.Value
Expand All @@ -37,10 +38,10 @@ type scheduleOption struct {
func newScheduleOption(cfg *Config) *scheduleOption {
o := &scheduleOption{}
o.store(&cfg.Schedule)
o.ns = make(map[string]*namespaceOption)
o.ns = sync.Map{}
for name, nsCfg := range cfg.Namespace {
nsCfg := nsCfg
o.ns[name] = newNamespaceOption(&nsCfg)
o.ns.Store(name, newNamespaceOption(&nsCfg))
}
o.rep = newReplication(&cfg.Replication)
o.pdServerConfig.Store(&cfg.PDServerCfg)
Expand All @@ -61,8 +62,36 @@ func (o *scheduleOption) GetReplication() *Replication {
return o.rep
}

func (o *scheduleOption) getNS(name string) (*namespaceOption, bool) {
if n, ok := o.ns.Load(name); ok {
if n, ok := n.(*namespaceOption); ok {
return n, true
}
}
return nil, false
}

func (o *scheduleOption) loadNSConfig() map[string]NamespaceConfig {
namespaces := make(map[string]NamespaceConfig)
f := func(k, v interface{}) bool {
var kstr string
var ok bool
if kstr, ok = k.(string); !ok {
return false
}
if ns, ok := v.(*namespaceOption); ok {
namespaces[kstr] = *ns.load()
return true
}
return false
}
o.ns.Range(f)

return namespaces
}

func (o *scheduleOption) GetMaxReplicas(name string) int {
if n, ok := o.ns[name]; ok {
if n, ok := o.getNS(name); ok {
return n.GetMaxReplicas()
}
return o.rep.GetMaxReplicas()
Expand Down Expand Up @@ -105,35 +134,35 @@ func (o *scheduleOption) GetMaxStoreDownTime() time.Duration {
}

func (o *scheduleOption) GetLeaderScheduleLimit(name string) uint64 {
if n, ok := o.ns[name]; ok {
if n, ok := o.getNS(name); ok {
return n.GetLeaderScheduleLimit()
}
return o.load().LeaderScheduleLimit
}

func (o *scheduleOption) GetRegionScheduleLimit(name string) uint64 {
if n, ok := o.ns[name]; ok {
if n, ok := o.getNS(name); ok {
return n.GetRegionScheduleLimit()
}
return o.load().RegionScheduleLimit
}

func (o *scheduleOption) GetReplicaScheduleLimit(name string) uint64 {
if n, ok := o.ns[name]; ok {
if n, ok := o.getNS(name); ok {
return n.GetReplicaScheduleLimit()
}
return o.load().ReplicaScheduleLimit
}

func (o *scheduleOption) GetMergeScheduleLimit(name string) uint64 {
if n, ok := o.ns[name]; ok {
if n, ok := o.getNS(name); ok {
return n.GetMergeScheduleLimit()
}
return o.load().MergeScheduleLimit
}

func (o *scheduleOption) GetHotRegionScheduleLimit(name string) uint64 {
if n, ok := o.ns[name]; ok {
if n, ok := o.getNS(name); ok {
return n.GetHotRegionScheduleLimit()
}
return o.load().HotRegionScheduleLimit
Expand Down Expand Up @@ -272,10 +301,8 @@ func (o *scheduleOption) loadPDServerConfig() *PDServerConfig {
}

func (o *scheduleOption) persist(kv *core.KV) error {
namespaces := make(map[string]NamespaceConfig)
for name, ns := range o.ns {
namespaces[name] = *ns.load()
}
namespaces := o.loadNSConfig()

cfg := &Config{
Schedule: *o.load(),
Replication: *o.rep.load(),
Expand All @@ -289,10 +316,8 @@ func (o *scheduleOption) persist(kv *core.KV) error {
}

func (o *scheduleOption) reload(kv *core.KV) error {
namespaces := make(map[string]NamespaceConfig)
for name, ns := range o.ns {
namespaces[name] = *ns.load()
}
namespaces := o.loadNSConfig()

cfg := &Config{
Schedule: *o.load().clone(),
Replication: *o.rep.load(),
Expand All @@ -311,7 +336,7 @@ func (o *scheduleOption) reload(kv *core.KV) error {
o.rep.store(&cfg.Replication)
for name, nsCfg := range cfg.Namespace {
nsCfg := nsCfg
o.ns[name] = newNamespaceOption(&nsCfg)
o.ns.Store(name, newNamespaceOption(&nsCfg))
}
o.labelProperty.Store(cfg.LabelProperty)
o.clusterVersion.Store(cfg.ClusterVersion)
Expand Down
Loading