Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: use separate participant #7032

Merged
merged 4 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230727073445-53e1f8730c30
github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20230727073445-53e1f8730c30 h1:EvqKcDT7ceGLW0mXqM8Cp5Z8DfgQRnwj2YTnlCLj2QI=
github.com/pingcap/kvproto v0.0.0-20230727073445-53e1f8730c30/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974 h1:Gn8rf2Mb3QDifUQHdtcopqKclc9L11hjhZFYBE65lcw=
github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
Expand Down
4 changes: 4 additions & 0 deletions pd.code-workspace
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
{
"name": "pd-tso-bench",
"path": "tools/pd-tso-bench"
},
{
"name": "pd-api-bench",
"path": "tools/pd-api-bench"
}
],
"settings": {}
Expand Down
14 changes: 8 additions & 6 deletions pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ package server

import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"path"
"strconv"
"sync"
"sync/atomic"
Expand All @@ -30,6 +28,7 @@ import (
grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/diagnosticspb"
"github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
"github.com/pingcap/sysutil"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -284,10 +283,13 @@ func (s *Server) startServer() (err error) {
uniqueName := s.cfg.ListenAddr
uniqueID := memberutil.GenerateUniqueID(uniqueName)
log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID))
resourceManagerPrimaryPrefix := endpoint.ResourceManagerSvcRootPath(s.clusterID)
s.participant = member.NewParticipant(s.GetClient())
s.participant.InitInfo(uniqueName, uniqueID, path.Join(resourceManagerPrimaryPrefix, fmt.Sprintf("%05d", 0)),
utils.PrimaryKey, "primary election", s.cfg.AdvertiseListenAddr)
s.participant = member.NewParticipant(s.GetClient(), utils.ResourceManagerServiceName)
p := &resource_manager.Participant{
Name: uniqueName,
Id: uniqueID, // id is unique among all participants
ListenUrls: []string{s.cfg.AdvertiseListenAddr},
}
s.participant.InitInfo(p, endpoint.ResourceManagerSvcRootPath(s.clusterID), utils.PrimaryKey, "primary election")

s.service = &Service{
ctx: s.Context(),
Expand Down
13 changes: 8 additions & 5 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"net/http"
"os"
"os/signal"
"path"
"strconv"
"sync"
"sync/atomic"
Expand All @@ -31,6 +30,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/diagnosticspb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/schedulingpb"
"github.com/pingcap/log"
"github.com/pingcap/sysutil"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -370,10 +370,13 @@ func (s *Server) startServer() (err error) {
uniqueName := s.cfg.ListenAddr
uniqueID := memberutil.GenerateUniqueID(uniqueName)
log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID))
schedulingPrimaryPrefix := endpoint.SchedulingSvcRootPath(s.clusterID)
s.participant = member.NewParticipant(s.GetClient())
s.participant.InitInfo(uniqueName, uniqueID, path.Join(schedulingPrimaryPrefix, fmt.Sprintf("%05d", 0)),
utils.PrimaryKey, "primary election", s.cfg.AdvertiseListenAddr)
s.participant = member.NewParticipant(s.GetClient(), utils.SchedulingServiceName)
p := &schedulingpb.Participant{
Name: uniqueName,
Id: uniqueID, // id is unique among all participants
ListenUrls: []string{s.cfg.AdvertiseListenAddr},
}
s.participant.InitInfo(p, endpoint.SchedulingSvcRootPath(s.clusterID), utils.PrimaryKey, "primary election")
s.basicCluster = core.NewBasicCluster()
err = s.startWatcher()
if err != nil {
Expand Down
13 changes: 6 additions & 7 deletions pkg/member/election_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/tsopb"
)

// ElectionLeader defines the common interface of the leader, which is the pdpb.Member
Expand Down Expand Up @@ -64,14 +63,14 @@ func (l *EmbeddedEtcdLeader) Watch(ctx context.Context) {
// EtcdLeader is the leader in the election group backed by the etcd, but it's
// decoupled from the embedded etcd.
type EtcdLeader struct {
wrapper *Participant
pariticipant *tsopb.Participant
revision int64
wrapper *Participant
participant participant
revision int64
}

// GetListenUrls returns current leader's client urls
func (l *EtcdLeader) GetListenUrls() []string {
return l.pariticipant.GetListenUrls()
return l.participant.GetListenUrls()
}

// GetRevision the revision of the leader in etcd
Expand All @@ -81,10 +80,10 @@ func (l *EtcdLeader) GetRevision() int64 {

// String declares fmt.Stringer
func (l *EtcdLeader) String() string {
return l.pariticipant.String()
return l.participant.String()
}

// Watch on the leader
func (l *EtcdLeader) Watch(ctx context.Context) {
l.wrapper.WatchLeader(ctx, l.pariticipant, l.revision)
l.wrapper.WatchLeader(ctx, l.participant, l.revision)
}
94 changes: 63 additions & 31 deletions pkg/member/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,42 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/kvproto/pkg/schedulingpb"
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/election"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)

type leadershipCheckFunc func(*election.Leadership) bool

type participant interface {
GetName() string
GetId() uint64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about GetID

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's generated by protobuf.

GetListenUrls() []string
String() string
Marshal() ([]byte, error)
Reset()
ProtoMessage()
}

// Participant is used for the election related logic. Compared to its counterpart
// EmbeddedEtcdMember, Participant relies on etcd for election, but it's decoupled
// from the embedded etcd. It implements Member interface.
type Participant struct {
leadership *election.Leadership
// stored as member type
leader atomic.Value
client *clientv3.Client
rootPath string
leaderPath string
member *tsopb.Participant
leader atomic.Value
client *clientv3.Client
rootPath string
leaderPath string
member participant
serviceName string
// memberValue is the serialized string of `member`. It will be saved in the
// leader key when this participant is successfully elected as the leader of
// the group. Every write will use it to check the leadership.
Expand All @@ -56,42 +70,37 @@ type Participant struct {
}

// NewParticipant create a new Participant.
func NewParticipant(client *clientv3.Client) *Participant {
func NewParticipant(client *clientv3.Client, serviceName string) *Participant {
return &Participant{
client: client,
client: client,
serviceName: serviceName,
}
}

// InitInfo initializes the member info. The leader key is path.Join(rootPath, leaderName)
func (m *Participant) InitInfo(name string, id uint64, rootPath string, leaderName string, purpose string, advertiseListenAddr string) {
leader := &tsopb.Participant{
Name: name,
Id: id, // id is unique among all participants
ListenUrls: []string{advertiseListenAddr},
}

data, err := leader.Marshal()
func (m *Participant) InitInfo(p participant, rootPath string, leaderName string, purpose string) {
data, err := p.Marshal()
if err != nil {
// can't fail, so panic here.
log.Fatal("marshal leader meet error", zap.Stringer("leader-name", leader), errs.ZapError(errs.ErrMarshalLeader, err))
log.Fatal("marshal leader meet error", zap.String("member-name", p.String()), errs.ZapError(errs.ErrMarshalLeader, err))
}
m.member = leader
m.member = p
m.memberValue = string(data)
m.rootPath = rootPath
m.leaderPath = path.Join(rootPath, leaderName)
m.leadership = election.NewLeadership(m.client, m.GetLeaderPath(), purpose)
m.lastLeaderUpdatedTime.Store(time.Now())
log.Info("participant joining election", zap.Stringer("participant-info", m.member), zap.String("leader-path", m.leaderPath))
log.Info("participant joining election", zap.String("participant-info", p.String()), zap.String("leader-path", m.leaderPath))
}

// ID returns the unique ID for this participant in the election group
func (m *Participant) ID() uint64 {
return m.member.Id
return m.member.GetId()
}

// Name returns the unique name in the election group.
func (m *Participant) Name() string {
return m.member.Name
return m.member.GetName()
}

// GetMember returns the member.
Expand All @@ -112,6 +121,9 @@ func (m *Participant) Client() *clientv3.Client {
// IsLeader returns whether the participant is the leader or not by checking its leadership's
// lease and leader info.
func (m *Participant) IsLeader() bool {
if m.GetLeader() == nil {
return false
}
return m.leadership.Check() && m.GetLeader().GetId() == m.member.GetId() && m.campaignCheck()
}

Expand All @@ -122,6 +134,9 @@ func (m *Participant) IsLeaderElected() bool {

// GetLeaderListenUrls returns current leader's listen urls
func (m *Participant) GetLeaderListenUrls() []string {
if m.GetLeader() == nil {
return nil
}
return m.GetLeader().GetListenUrls()
}

Expand All @@ -131,27 +146,36 @@ func (m *Participant) GetLeaderID() uint64 {
}

// GetLeader returns current leader of the election group.
func (m *Participant) GetLeader() *tsopb.Participant {
func (m *Participant) GetLeader() participant {
leader := m.leader.Load()
if leader == nil {
return nil
}
member := leader.(*tsopb.Participant)
member := leader.(participant)
if member.GetId() == 0 {
return nil
}
return member
}

// setLeader sets the member's leader.
func (m *Participant) setLeader(member *tsopb.Participant) {
func (m *Participant) setLeader(member participant) {
m.leader.Store(member)
m.lastLeaderUpdatedTime.Store(time.Now())
}

// unsetLeader unsets the member's leader.
func (m *Participant) unsetLeader() {
m.leader.Store(&tsopb.Participant{})
var leader participant
switch m.serviceName {
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
case utils.TSOServiceName:
leader = &tsopb.Participant{}
case utils.SchedulingServiceName:
leader = &schedulingpb.Participant{}
case utils.ResourceManagerServiceName:
leader = &resource_manager.Participant{}
}
m.leader.Store(leader)
m.lastLeaderUpdatedTime.Store(time.Now())
}

Expand Down Expand Up @@ -200,8 +224,16 @@ func (m *Participant) PreCheckLeader() error {
}

// getPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key).
func (m *Participant) getPersistentLeader() (*tsopb.Participant, int64, error) {
leader := &tsopb.Participant{}
func (m *Participant) getPersistentLeader() (participant, int64, error) {
var leader participant
switch m.serviceName {
case utils.TSOServiceName:
leader = &tsopb.Participant{}
case utils.SchedulingServiceName:
leader = &schedulingpb.Participant{}
case utils.ResourceManagerServiceName:
leader = &resource_manager.Participant{}
}
ok, rev, err := etcdutil.GetProtoMsgWithModRev(m.client, m.GetLeaderPath(), leader)
if err != nil {
return nil, 0, err
Expand Down Expand Up @@ -248,14 +280,14 @@ func (m *Participant) CheckLeader() (ElectionLeader, bool) {
}

return &EtcdLeader{
wrapper: m,
pariticipant: leader,
revision: revision,
wrapper: m,
participant: leader,
revision: revision,
}, false
}

// WatchLeader is used to watch the changes of the leader.
func (m *Participant) WatchLeader(ctx context.Context, leader *tsopb.Participant, revision int64) {
func (m *Participant) WatchLeader(ctx context.Context, leader participant, revision int64) {
m.setLeader(leader)
m.leadership.Watch(ctx, revision)
m.unsetLeader()
Expand All @@ -269,7 +301,7 @@ func (m *Participant) ResetLeader() {
}

// IsSameLeader checks whether a server is the leader itself.
func (m *Participant) IsSameLeader(leader *tsopb.Participant) bool {
func (m *Participant) IsSameLeader(leader participant) bool {
return leader.GetId() == m.ID()
}

Expand Down
12 changes: 8 additions & 4 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
perrors "github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/election"
"github.com/tikv/pd/pkg/errs"
Expand Down Expand Up @@ -736,10 +737,13 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
zap.String("participant-name", uniqueName),
zap.Uint64("participant-id", uniqueID))
// Initialize the participant info to join the primary election.
participant := member.NewParticipant(kgm.etcdClient)
participant.InitInfo(
uniqueName, uniqueID, endpoint.KeyspaceGroupsElectionPath(kgm.tsoSvcRootPath, group.ID),
mcsutils.PrimaryKey, "keyspace group primary election", kgm.cfg.GetAdvertiseListenAddr())
participant := member.NewParticipant(kgm.etcdClient, mcsutils.TSOServiceName)
p := &tsopb.Participant{
Name: uniqueName,
Id: uniqueID, // id is unique among all participants
ListenUrls: []string{kgm.cfg.GetAdvertiseListenAddr()},
}
participant.InitInfo(p, endpoint.KeyspaceGroupsElectionPath(kgm.tsoSvcRootPath, group.ID), mcsutils.PrimaryKey, "keyspace group primary election")
// If the keyspace group is in split, we should ensure that the primary elected by the new keyspace group
// is always on the same TSO Server node as the primary of the old keyspace group, and this constraint cannot
// be broken until the entire split process is completed.
Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0
require (
github.com/docker/go-units v0.4.0
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230727073445-53e1f8730c30
github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/stretchr/testify v1.8.2
github.com/tikv/pd v0.0.0-00010101000000-000000000000
Expand Down
4 changes: 2 additions & 2 deletions tests/integrations/client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20230727073445-53e1f8730c30 h1:EvqKcDT7ceGLW0mXqM8Cp5Z8DfgQRnwj2YTnlCLj2QI=
github.com/pingcap/kvproto v0.0.0-20230727073445-53e1f8730c30/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974 h1:Gn8rf2Mb3QDifUQHdtcopqKclc9L11hjhZFYBE65lcw=
github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
Expand Down
Loading