Skip to content

Commit

Permalink
dm-master: leader election (pingcap#367)
Browse files Browse the repository at this point in the history
  • Loading branch information
csuzhangxc authored Nov 26, 2019
1 parent 3f6177c commit 18f66f2
Show file tree
Hide file tree
Showing 11 changed files with 694 additions and 19 deletions.
2 changes: 2 additions & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ ErrBackoffArgsNotValid,[code=11103:class=functional:scope=internal:level=medium]
ErrInitLoggerFail,[code=11104:class=functional:scope=internal:level=medium],"init logger failed"
ErrGTIDTruncateInvalid,[code=11105:class=functional:scope=internal:level=high],"truncate GTID sets %v to %v not valid"
ErrRelayLogGivenPosTooBig,[code=11106:class=functional:scope=internal:level=high],"the given relay log pos %s of meta config is too big, please check it again"
ErrElectionCampaignFail,[code=11107:class=functional:scope=internal:level=high],"fail to campaign leader: %s"
ErrElectionGetLeaderIDFail,[code=11108:class=functional:scope=internal:level=medium],"fail to get leader ID"
ErrConfigCheckItemNotSupport,[code=20001:class=config:scope=internal:level=medium],"checking item %s is not supported\n%s"
ErrConfigTomlTransform,[code=20002:class=config:scope=internal:level=medium],"%s"
ErrConfigTaskYamlTransform,[code=20003:class=config:scope=internal:level=medium],"%s"
Expand Down
49 changes: 49 additions & 0 deletions dm/master/election.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package master

import (
"context"

"go.uber.org/zap"

"github.com/pingcap/dm/pkg/log"
)

func (s *Server) electionNotify(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case leader := <-s.election.LeaderNotify():
// output the leader info.
if leader {
log.L().Info("current member become the leader", zap.String("current member", s.cfg.Name))
} else {
_, leaderID, err2 := s.election.LeaderInfo(ctx)
if err2 == nil {
log.L().Info("current member retire from the leader", zap.String("leader", leaderID), zap.String("current member", s.cfg.Name))
} else {
log.L().Warn("get leader info", zap.Error(err2))
}
}
case err := <-s.election.ErrorNotify():
// handle errors here, we do no meaningful things now.
// but maybe:
// 1. trigger an alert
// 2. shutdown the DM-master process
log.L().Error("receive error from election", zap.Error(err))
}
}
}
6 changes: 1 addition & 5 deletions dm/master/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"strings"
"time"

"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -119,10 +118,7 @@ func prepareJoinEtcd(cfg *Config) error {
}

// if without previous data, we need a client to contact with the existing cluster.
client, err := clientv3.New(clientv3.Config{
Endpoints: strings.Split(cfg.Join, ","),
DialTimeout: etcdutil.DefaultDialTimeout,
})
client, err := etcdutil.CreateClient(strings.Split(cfg.Join, ","))
if err != nil {
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, fmt.Sprintf("create etcd client for %s", cfg.Join))
}
Expand Down
42 changes: 42 additions & 0 deletions dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/pingcap/errors"
"github.com/siddontang/go/sync2"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand All @@ -34,12 +35,23 @@ import (
operator "github.com/pingcap/dm/dm/master/sql-operator"
"github.com/pingcap/dm/dm/master/workerrpc"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/pkg/election"
"github.com/pingcap/dm/pkg/etcdutil"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/tracing"
"github.com/pingcap/dm/syncer"
)

const (
// the session's TTL in seconds for leader election.
// NOTE: select this value carefully when adding a mechanism relying on leader election.
electionTTL = 60
// the DM-master leader election key prefix
// DM-master cluster : etcd cluster = 1 : 1 now.
electionKey = "/dm-master/leader"
)

var (
fetchDDLInfoRetryTimeout = 5 * time.Second
)
Expand All @@ -53,6 +65,9 @@ type Server struct {
// the embed etcd server, and the gRPC/HTTP API server also attached to it.
etcd *embed.Etcd

etcdClient *clientv3.Client
election *election.Election

// WaitGroup for background functions.
bgFunWg sync.WaitGroup

Expand Down Expand Up @@ -151,6 +166,19 @@ func (s *Server) Start(ctx context.Context) (err error) {
return
}

// create an etcd client used in the whole server instance.
// NOTE: we only use the local member's address now, but we can use all endpoints of the cluster if needed.
s.etcdClient, err = etcdutil.CreateClient([]string{s.cfg.MasterAddr})
if err != nil {
return
}

// start leader election
s.election, err = election.NewElection(ctx, s.etcdClient, electionTTL, electionKey, s.cfg.Name)
if err != nil {
return
}

s.closed.Set(false) // the server started now.

s.bgFunWg.Add(1)
Expand All @@ -159,6 +187,12 @@ func (s *Server) Start(ctx context.Context) (err error) {
s.ap.Start(ctx)
}()

s.bgFunWg.Add(1)
go func() {
defer s.bgFunWg.Done()
s.electionNotify(ctx)
}()

s.bgFunWg.Add(1)
go func() {
defer s.bgFunWg.Done()
Expand Down Expand Up @@ -194,6 +228,14 @@ func (s *Server) Close() {
// wait for background functions returned
s.bgFunWg.Wait()

if s.election != nil {
s.election.Close()
}

if s.etcdClient != nil {
s.etcdClient.Close()
}

// close the etcd and other attached servers
if s.etcd != nil {
s.etcd.Close()
Expand Down
18 changes: 12 additions & 6 deletions dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/pd/pkg/tempurl"
"go.etcd.io/etcd/clientv3"

"github.com/pingcap/dm/checker"
"github.com/pingcap/dm/dm/config"
Expand Down Expand Up @@ -1548,6 +1547,11 @@ func (t *testMaster) TestJoinMember(c *check.C) {
c.Assert(s1.Start(ctx), check.IsNil)
defer s1.Close()

// wait the first one become the leader
c.Assert(utils.WaitSomething(30, 10*time.Millisecond, func() bool {
return s1.election.IsLeader()
}), check.IsTrue)

// join to an existing cluster
cfg2 := NewConfig()
c.Assert(cfg2.Parse([]string{"-config=./dm-master.toml"}), check.IsNil)
Expand All @@ -1562,14 +1566,11 @@ func (t *testMaster) TestJoinMember(c *check.C) {
c.Assert(s2.Start(ctx), check.IsNil)
defer s2.Close()

client, err := clientv3.New(clientv3.Config{
Endpoints: strings.Split(cfg1.AdvertisePeerUrls, ","),
DialTimeout: etcdutil.DefaultDialTimeout,
})
client, err := etcdutil.CreateClient(strings.Split(cfg1.AdvertisePeerUrls, ","))
c.Assert(err, check.IsNil)
defer client.Close()

// verify membersm
// verify members
listResp, err := etcdutil.ListMembers(client)
c.Assert(err, check.IsNil)
c.Assert(listResp.Members, check.HasLen, 2)
Expand All @@ -1582,5 +1583,10 @@ func (t *testMaster) TestJoinMember(c *check.C) {
_, ok = names[cfg2.Name]
c.Assert(ok, check.IsTrue)

// s1 is still the leader
_, leaderID, err := s2.election.LeaderInfo(ctx)
c.Assert(err, check.IsNil)
c.Assert(leaderID, check.Equals, cfg1.Name)

cancel()
}
Loading

0 comments on commit 18f66f2

Please sign in to comment.