-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathelection_manager.go
166 lines (156 loc) · 4.78 KB
/
election_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
package electing_master
import (
"errors"
"github.com/samuel/go-zookeeper/zk"
"log"
"strings"
"time"
)
type ZookeeperConfig struct {
Servers []string
RootPath string
MasterPath string
}
type ElectionManager struct {
ZKClientConn *zk.Conn
ZKConfig *ZookeeperConfig
IsMasterQ chan bool
}
const (
DefaultRootPath = "/GOLANG_ELECTING_MASTER"
DefaultMasterPath = "/MASTER"
)
// zkAddr format: {ip1}:{port1},{ip2}:{port2},{ip3}:{port3}/{rootPath}, and the rootPath is the optional field
// zkAddr eg1: 127.0.0.1
// zkAddr eg2: 127.0.0.1:2080,127.0.0.1:2081,127.0.0.1:2082
// zkAddr eg3: 127.0.0.1:2080,127.0.0.1:2081,127.0.0.1:2082/__ELECTING_MASTER__
func GoElectingMaster(zkAddr string, isMasterQ chan bool) error {
zkConf := &ZookeeperConfig{
RootPath: DefaultRootPath,
MasterPath: DefaultMasterPath,
}
if zkAddr == "" {
return errors.New("empty zookeeper address.")
}
var serversStr, pathStr string
if strings.Contains(zkAddr, "/") {
sp := strings.Split(zkAddr, "/")
serversStr, pathStr = sp[0], sp[1]
} else {
serversStr = zkAddr
}
if pathStr != "" {
zkConf.RootPath = "/" + pathStr
}
if strings.Contains(serversStr, ",") {
zkConf.Servers = strings.Split(serversStr, ",")
} else {
zkConf.Servers = []string{serversStr}
}
electionManager := &ElectionManager{
nil,
zkConf,
isMasterQ,
}
if err := electionManager.electMaster(); err != nil {
return err
}
go electionManager.WatchMaster()
return nil
}
// 判断是否成功连接到zookeeper
func (electionManager *ElectionManager) isConnected() bool {
if electionManager.ZKClientConn == nil {
return false
} else if electionManager.ZKClientConn.State() != zk.StateConnected {
return false
}
return true
}
// 初始化zookeeper连接
func (electionManager *ElectionManager) initConnection() error {
// 连接为空,或连接不成功,获取zookeeper服务器的连接
if !electionManager.isConnected() {
conn, connChan, err := zk.Connect(electionManager.ZKConfig.Servers, time.Second)
if err != nil {
return err
}
// 等待连接成功
for {
isConnected := false
select {
case connEvent := <-connChan:
if connEvent.State == zk.StateConnected {
isConnected = true
log.Println("connect to zookeeper server success!")
}
case _ = <-time.After(time.Second * 3): // 3秒仍未连接成功则返回连接超时
return errors.New("connect to zookeeper server timeout!")
}
if isConnected {
break
}
}
electionManager.ZKClientConn = conn
}
return nil
}
// 选举master
func (electionManager *ElectionManager) electMaster() error {
err := electionManager.initConnection()
if err != nil {
return err
}
// 判断zookeeper中是否存在root目录,不存在则创建该目录
isExist, _, err := electionManager.ZKClientConn.Exists(electionManager.ZKConfig.RootPath)
if err != nil {
return err
}
if !isExist {
path, err := electionManager.ZKClientConn.Create(electionManager.ZKConfig.RootPath, nil, 0, zk.WorldACL(zk.PermAll))
if err != nil {
return err
}
if electionManager.ZKConfig.RootPath != path {
return errors.New("Create returned different path " + electionManager.ZKConfig.RootPath + " != " + path)
}
}
// 创建用于选举master的ZNode,该节点为Ephemeral类型,表示客户端连接断开后,其创建的节点也会被销毁
masterPath := electionManager.ZKConfig.RootPath + electionManager.ZKConfig.MasterPath
path, err := electionManager.ZKClientConn.Create(masterPath, nil, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
if err == nil { // 创建成功表示选举master成功
if path == masterPath {
log.Println("elect master success!")
electionManager.IsMasterQ <- true
} else {
return errors.New("Create returned different path " + masterPath + " != " + path)
}
} else { // 创建失败表示选举master失败
log.Printf("elect master failure: %s", err)
electionManager.IsMasterQ <- false
}
return nil
}
// 监听zookeeper中master znode,若被删除,表示master故障或网络迟缓,重新选举
func (electionManager *ElectionManager) WatchMaster() {
// watch zk根znode下面的子znode,当有连接断开时,对应znode被删除,触发事件后重新选举
children, state, childCh, err := electionManager.ZKClientConn.ChildrenW(electionManager.ZKConfig.RootPath + electionManager.ZKConfig.MasterPath)
if err != nil {
log.Printf("watch children error: %s", err)
}
log.Printf("watch children result. children=%s, state=%v", children, state)
for {
select {
case childEvent := <-childCh:
if childEvent.Type == zk.EventNodeDeleted {
log.Printf("receive znode delete event: %v", childEvent)
// 重新选举
log.Println("start electing new master ...")
err = electionManager.electMaster()
if err != nil {
log.Printf("elect new master error: %s", err)
}
}
}
}
}