Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <jinhao.hu@pingcap.com>
  • Loading branch information
HuSharp committed Jul 3, 2024
1 parent dbc5447 commit 204ffd5
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 25 deletions.
8 changes: 4 additions & 4 deletions pkg/mcs/discovery/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,14 @@ func TransferPrimary(client *clientv3.Client, serviceName, oldPrimary, newPrimar
}

var primaryIDs []string
var secondaryValues []string
var memberValues []string
for _, member := range entries {
if (newPrimary == "" && member.ServiceAddr != oldPrimary) || (newPrimary != "" && member.Name == newPrimary) {
primaryIDs = append(primaryIDs, member.ServiceAddr)
if string(member.MemberValue) == "" {
return errors.New(fmt.Sprintf("member %s value is empty", member.Name))
}
secondaryValues = append(secondaryValues, string(member.MemberValue))
memberValues = append(memberValues, string(member.MemberValue))
}
}
if len(primaryIDs) == 0 {
Expand All @@ -128,7 +128,7 @@ func TransferPrimary(client *clientv3.Client, serviceName, oldPrimary, newPrimar
}

// remove possible residual value.
utils.RemoveExpectedPrimary(client, primaryKey)
utils.ClearPrimaryExpectationFlag(client, primaryKey)

// grant the primary lease to the new primary.
grantResp, err := client.Grant(client.Ctx(), utils.DefaultLeaderLease)
Expand All @@ -137,7 +137,7 @@ func TransferPrimary(client *clientv3.Client, serviceName, oldPrimary, newPrimar
}
// update primary key to notify old primary server.
putResp, err := kv.NewSlowLogTxn(client).
Then(clientv3.OpPut(primaryKey, secondaryValues[nextPrimaryID], clientv3.WithLease(grantResp.ID))).
Then(clientv3.OpPut(primaryKey, memberValues[nextPrimaryID], clientv3.WithLease(grantResp.ID))).
Commit()
if err != nil || !putResp.Succeeded {
return errors.Errorf("failed to write primary flag for %s, err: %v", serviceName, err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func (s *Server) primaryElectionLoop() {
}

// To make sure the expected primary(if existed) and new primary are on the same server.
expectedPrimary := utils.GetExpectedPrimary(s.GetClient(), s.participant.GetLeaderPath())
expectedPrimary := utils.AttachExpectedPrimaryFlag(s.GetClient(), s.participant.GetLeaderPath())
// skip campaign the primary if the expected primary is not empty and not equal to the current memberValue.
// expected primary ONLY SET BY `/ms/primary/transfer` API.
if expectedPrimary != "" && expectedPrimary != s.participant.MemberValue() {
Expand Down Expand Up @@ -358,7 +358,7 @@ func (s *Server) primaryWatch(ctx context.Context, exitPrimary chan struct{}) {
}
if curPrimary != nil && resp.Kvs[0].Value != nil && string(curPrimary) != string(resp.Kvs[0].Value) {
// 1. modify the expected primary flag to the new primary.
utils.SetExpectedPrimary(s.participant.Client(), s.participant.GetLeaderPath())
utils.MarkExpectedPrimaryFlag(s.participant.Client(), s.participant.GetLeaderPath())
// 2. modify memory status.
s.participant.UnsetLeader()
defer log.Info("scheduling primary exit the primary watch loop")
Expand Down
30 changes: 15 additions & 15 deletions pkg/mcs/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ const (
ClusterIDPath = "/pd/cluster_id"
// retryInterval is the interval to retry.
retryInterval = time.Second
// ExpectedPrimary is the path to store the expected primary , ONLY Triggered BY `/ms/primary/transfer` API.
// ExpectedPrimaryFlag is the flag to indicate the expected primary, ONLY marked BY `/ms/primary/transfer` API.
// This flag likes a fence to avoid exited 2 primaries in the cluster simultaneously.
// 1. Since follower will campaign a new primary when it found the `leader_key` is deleted.
// **We can ensure `expected_primary` is set before deleting the `leader_key`.**
// 2. Old primary will set `expected_primary` firstly,
// 2. Old primary will mark `expected_primary` firstly,
// then delete the `leader_key` which will trigger the follower to campaign a new primary.
ExpectedPrimary = "expected_primary"
ExpectedPrimaryFlag = "expected_primary"
)

// InitClusterID initializes the cluster ID.
Expand All @@ -78,35 +78,35 @@ func InitClusterID(ctx context.Context, client *clientv3.Client) (id uint64, err
return 0, errors.Errorf("failed to init cluster ID after retrying %d times", maxRetryTimes)
}

// GetExpectedPrimary indicates API has changed the primary.
func GetExpectedPrimary(client *clientv3.Client, leaderPath string) string {
primary, err := etcdutil.GetValue(client, strings.Join([]string{leaderPath, ExpectedPrimary}, "/"))
// AttachExpectedPrimaryFlag attaches the expected primary flag.
func AttachExpectedPrimaryFlag(client *clientv3.Client, leaderPath string) string {
primary, err := etcdutil.GetValue(client, strings.Join([]string{leaderPath, ExpectedPrimaryFlag}, "/"))
if err != nil {
log.Error("get expected primary key error", errs.ZapError(err))
log.Error("get expected primary flag error", errs.ZapError(err))
return ""
}

return string(primary)
}

// RemoveExpectedPrimary removes the expected primary key.
// ClearPrimaryExpectationFlag clears the expected primary flag.
// - removed when campaign new primary successfully.
// - removed when appoint new primary by API.
func RemoveExpectedPrimary(client *clientv3.Client, leaderPath string) {
log.Info("remove expected primary key", zap.String("leader-path", leaderPath))
func ClearPrimaryExpectationFlag(client *clientv3.Client, leaderPath string) {
log.Info("remove expected primary flag", zap.String("primary-path", leaderPath))
// remove expected leader key
resp, err := kv.NewSlowLogTxn(client).
Then(clientv3.OpDelete(strings.Join([]string{leaderPath, ExpectedPrimary}, "/"))).
Then(clientv3.OpDelete(strings.Join([]string{leaderPath, ExpectedPrimaryFlag}, "/"))).
Commit()
if err != nil || !resp.Succeeded {
log.Error("change expected primary error", errs.ZapError(err))
return
}
}

// SetExpectedPrimary sets the expected primary key when the current primary has exited.
func SetExpectedPrimary(client *clientv3.Client, leaderPath string) {
log.Info("set expected primary key", zap.String("leader-path", leaderPath))
// MarkExpectedPrimaryFlag marks the expected primary flag when the current primary has exited.
func MarkExpectedPrimaryFlag(client *clientv3.Client, leaderPath string) {
log.Info("set expected primary flag", zap.String("leader-path", leaderPath))
leaderRaw, err := etcdutil.GetValue(client, leaderPath)
if err != nil {
log.Error("get primary key error", zap.Error(err))
Expand All @@ -120,7 +120,7 @@ func SetExpectedPrimary(client *clientv3.Client, leaderPath string) {
// write a flag to indicate the current primary has exited
resp, err := kv.NewSlowLogTxn(client).
Then(
clientv3.OpPut(strings.Join([]string{leaderPath, ExpectedPrimary}, "/"), string(leaderRaw), clientv3.WithLease(grantResp.ID)),
clientv3.OpPut(strings.Join([]string{leaderPath, ExpectedPrimaryFlag}, "/"), string(leaderRaw), clientv3.WithLease(grantResp.ID)),
// indicate the current primary has exited
clientv3.OpDelete(leaderPath)).
Commit()
Expand Down
2 changes: 1 addition & 1 deletion pkg/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (m *EmbeddedEtcdMember) UnsetLeader() {
// EnableLeader sets the member itself to a PD leader.
func (m *EmbeddedEtcdMember) EnableLeader() {
m.setLeader(m.member)
utils.RemoveExpectedPrimary(m.client, m.GetLeaderPath())
utils.ClearPrimaryExpectationFlag(m.client, m.GetLeaderPath())
}

// GetLeaderPath returns the path of the PD leader.
Expand Down
2 changes: 1 addition & 1 deletion pkg/member/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (m *Participant) UnsetLeader() {
// EnableLeader declares the member itself to be the leader.
func (m *Participant) EnableLeader() {
m.setLeader(m.member)
utils.RemoveExpectedPrimary(m.client, m.GetLeaderPath())
utils.ClearPrimaryExpectationFlag(m.client, m.GetLeaderPath())
}

// GetLeaderPath returns the path of the leader.
Expand Down
4 changes: 2 additions & 2 deletions pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ func (gta *GlobalTSOAllocator) primaryElectionLoop() {
}

// To make sure the expected primary(if existed) and new primary are on the same server.
expectedPrimary := mcsutils.GetExpectedPrimary(gta.member.Client(), gta.member.GetLeaderPath())
expectedPrimary := mcsutils.AttachExpectedPrimaryFlag(gta.member.Client(), gta.member.GetLeaderPath())
// skip campaign the primary if the expected primary is not empty and not equal to the current memberValue.
// expected primary ONLY SET BY `/ms/primary/transfer` API.
if expectedPrimary != "" && expectedPrimary != gta.member.MemberValue() {
Expand Down Expand Up @@ -703,7 +703,7 @@ func (gta *GlobalTSOAllocator) primaryWatch(ctx context.Context, exitPrimary cha
}
if curPrimary != nil && resp.Kvs[0].Value != nil && string(curPrimary) != string(resp.Kvs[0].Value) {
// 1. modify the expected primary flag to the new primary.
mcsutils.SetExpectedPrimary(gta.member.Client(), gta.member.GetLeaderPath())
mcsutils.MarkExpectedPrimaryFlag(gta.member.Client(), gta.member.GetLeaderPath())
// 2. modify memory status.
gta.member.UnsetLeader()
defer log.Info("tso primary exit the primary watch loop")
Expand Down

0 comments on commit 204ffd5

Please sign in to comment.