Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

dm-master: leader election #367

Merged
merged 20 commits into from
Nov 26, 2019
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ ErrBackoffArgsNotValid,[code=11103:class=functional:scope=internal:level=medium]
ErrInitLoggerFail,[code=11104:class=functional:scope=internal:level=medium],"init logger failed"
ErrGTIDTruncateInvalid,[code=11105:class=functional:scope=internal:level=high],"truncate GTID sets %v to %v not valid"
ErrRelayLogGivenPosTooBig,[code=11106:class=functional:scope=internal:level=high],"the given relay log pos %s of meta config is too big, please check it again"
ErrElectionCampaignFail,[code=11107:class=functional:scope=internal:level=high],"fail to campaign leader: %s"
ErrElectionGetLeaderIDFail,[code=11108:class=functional:scope=internal:level=medium],"fail to get leader ID"
ErrConfigCheckItemNotSupport,[code=20001:class=config:scope=internal:level=medium],"checking item %s is not supported\n%s"
ErrConfigTomlTransform,[code=20002:class=config:scope=internal:level=medium],"%s"
ErrConfigTaskYamlTransform,[code=20003:class=config:scope=internal:level=medium],"%s"
Expand Down
49 changes: 49 additions & 0 deletions dm/master/election.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package master

import (
"context"

"go.uber.org/zap"

"github.com/pingcap/dm/pkg/log"
)

func (s *Server) electionNotify(ctx context.Context) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why need this function, seems toBeLeader and retireLeader already print the log

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

toBeLeader and retireLeader only log the leadership of the current member.
this function is used to show the usage of Election (and also log the leader name even if it's not the current member), do you think should we need to remove it?

Copy link
Member Author

@csuzhangxc csuzhangxc Nov 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one more thing, pkg/election is a pkg, but this function is in the application. Maybe we need to add some metrics for the leadership later, and do that in this function is better.

so, how about removing the log in pkg/election?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if master needs to do something(for example: meet error in the election, master exit) according to the election's result I think putting here is better.
it is up to you

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose to remove the log in pkg/election in b99ae46.

for {
select {
case <-ctx.Done():
return
case leader := <-s.election.LeaderNotify():
// output the leader info.
if leader {
log.L().Info("current member become the leader", zap.String("current member", s.cfg.Name))
} else {
_, leaderID, err2 := s.election.LeaderInfo(ctx)
if err2 == nil {
log.L().Info("current member retire from the leader", zap.String("leader", leaderID), zap.String("current member", s.cfg.Name))
} else {
log.L().Error("get leader info", zap.Error(err2))
amyangfei marked this conversation as resolved.
Show resolved Hide resolved
}
}
case err := <-s.election.ErrorNotify():
// handle errors here, we do no meaningful things now.
// but maybe:
// 1. trigger an alert
// 2. shutdown the DM-master process
log.L().Error("receive error from election", zap.Error(err))
}
}
}
6 changes: 1 addition & 5 deletions dm/master/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"strings"
"time"

"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -119,10 +118,7 @@ func prepareJoinEtcd(cfg *Config) error {
}

// if without previous data, we need a client to contact with the existing cluster.
client, err := clientv3.New(clientv3.Config{
Endpoints: strings.Split(cfg.Join, ","),
DialTimeout: etcdutil.DefaultDialTimeout,
})
client, err := etcdutil.CreateClient(strings.Split(cfg.Join, ","))
if err != nil {
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, fmt.Sprintf("create etcd client for %s", cfg.Join))
}
Expand Down
42 changes: 42 additions & 0 deletions dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/pingcap/errors"
"github.com/siddontang/go/sync2"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand All @@ -34,12 +35,23 @@ import (
operator "github.com/pingcap/dm/dm/master/sql-operator"
"github.com/pingcap/dm/dm/master/workerrpc"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/pkg/election"
"github.com/pingcap/dm/pkg/etcdutil"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/tracing"
"github.com/pingcap/dm/syncer"
)

const (
// the session's TTL in seconds for leader election.
// NOTE: select this value carefully when adding a mechanism relying on leader election.
electionTTL = 60
// the DM-master leader election key prefix
// DM-master cluster : etcd cluster = 1 : 1 now.
electionKey = "/dm-master/leader"
)

var (
fetchDDLInfoRetryTimeout = 5 * time.Second
)
Expand All @@ -53,6 +65,9 @@ type Server struct {
// the embed etcd server, and the gRPC/HTTP API server also attached to it.
etcd *embed.Etcd

etcdClient *clientv3.Client
election *election.Election

// WaitGroup for background functions.
bgFunWg sync.WaitGroup

Expand Down Expand Up @@ -151,6 +166,19 @@ func (s *Server) Start(ctx context.Context) (err error) {
return
}

// create an etcd client used in the whole server instance.
// NOTE: we only use the local member's address now, but we can use all endpoints of the cluster if needed.
s.etcdClient, err = etcdutil.CreateClient([]string{s.cfg.MasterAddr})
if err != nil {
return
}

// start leader election
s.election, err = election.NewElection(ctx, s.etcdClient, electionTTL, electionKey, s.cfg.Name)
if err != nil {
return
}

s.closed.Set(false) // the server started now.

s.bgFunWg.Add(1)
Expand All @@ -159,6 +187,12 @@ func (s *Server) Start(ctx context.Context) (err error) {
s.ap.Start(ctx)
}()

s.bgFunWg.Add(1)
go func() {
defer s.bgFunWg.Done()
s.electionNotify(ctx)
}()

s.bgFunWg.Add(1)
go func() {
defer s.bgFunWg.Done()
Expand Down Expand Up @@ -194,6 +228,14 @@ func (s *Server) Close() {
// wait for background functions returned
s.bgFunWg.Wait()

if s.election != nil {
s.election.Close()
}

if s.etcdClient != nil {
s.etcdClient.Close()
}

// close the etcd and other attached servers
if s.etcd != nil {
s.etcd.Close()
Expand Down
18 changes: 12 additions & 6 deletions dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/pd/pkg/tempurl"
"go.etcd.io/etcd/clientv3"

"github.com/pingcap/dm/checker"
"github.com/pingcap/dm/dm/config"
Expand Down Expand Up @@ -1548,6 +1547,11 @@ func (t *testMaster) TestJoinMember(c *check.C) {
c.Assert(s1.Start(ctx), check.IsNil)
defer s1.Close()

// wait the first one become the leader
c.Assert(utils.WaitSomething(30, 10*time.Millisecond, func() bool {
return s1.election.IsLeader()
}), check.IsTrue)

// join to an existing cluster
cfg2 := NewConfig()
c.Assert(cfg2.Parse([]string{"-config=./dm-master.toml"}), check.IsNil)
Expand All @@ -1562,14 +1566,11 @@ func (t *testMaster) TestJoinMember(c *check.C) {
c.Assert(s2.Start(ctx), check.IsNil)
defer s2.Close()

client, err := clientv3.New(clientv3.Config{
Endpoints: strings.Split(cfg1.AdvertisePeerUrls, ","),
DialTimeout: etcdutil.DefaultDialTimeout,
})
client, err := etcdutil.CreateClient(strings.Split(cfg1.AdvertisePeerUrls, ","))
c.Assert(err, check.IsNil)
defer client.Close()

// verify membersm
// verify members
listResp, err := etcdutil.ListMembers(client)
c.Assert(err, check.IsNil)
c.Assert(listResp.Members, check.HasLen, 2)
Expand All @@ -1582,5 +1583,10 @@ func (t *testMaster) TestJoinMember(c *check.C) {
_, ok = names[cfg2.Name]
c.Assert(ok, check.IsTrue)

// s1 is still the leader
_, leaderID, err := s2.election.LeaderInfo(ctx)
c.Assert(err, check.IsNil)
c.Assert(leaderID, check.Equals, cfg1.Name)

cancel()
}
Loading