-
Notifications
You must be signed in to change notification settings - Fork 9
/
server.go
187 lines (165 loc) · 5.18 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
// Copyright 2021 Peanutzhen. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
package peanutcache
import (
"context"
"fmt"
"github.com/peanutzhen/peanutcache/consistenthash"
pb "github.com/peanutzhen/peanutcache/peanutcachepb"
"github.com/peanutzhen/peanutcache/registry"
"log"
"net"
"strings"
"sync"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
)
// server 模块为peanutcache之间提供通信能力
// 这样部署在其他机器上的cache可以通过访问server获取缓存
// 至于找哪台主机 那是一致性哈希的工作了
const (
defaultAddr = "127.0.0.1:6324"
defaultReplicas = 50
)
var (
defaultEtcdConfig = clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
}
)
// server 和 Group 是解耦合的 所以server要自己实现并发控制
type server struct {
pb.UnimplementedPeanutCacheServer
addr string // format: ip:port
status bool // true: running false: stop
stopSignal chan error // 通知registry revoke服务
mu sync.Mutex
consHash *consistenthash.Consistency
clients map[string]*client
}
// NewServer 创建cache的svr 若addr为空 则使用defaultAddr
func NewServer(addr string) (*server, error) {
if addr == "" {
addr = defaultAddr
}
if !validPeerAddr(addr) {
return nil, fmt.Errorf("invalid addr %s, it should be x.x.x.x:port", addr)
}
return &server{addr: addr}, nil
}
// Get 实现PeanutCache service的Get接口
func (s *server) Get(ctx context.Context, in *pb.GetRequest) (*pb.GetResponse, error) {
group, key := in.GetGroup(), in.GetKey()
resp := &pb.GetResponse{}
log.Printf("[peanutcache_svr %s] Recv RPC Request - (%s)/(%s)", s.addr, group, key)
if key == "" {
return resp, fmt.Errorf("key required")
}
g := GetGroup(group)
if g == nil {
return resp, fmt.Errorf("group not found")
}
view, err := g.Get(key)
if err != nil {
return resp, err
}
resp.Value = view.ByteSlice()
return resp, nil
}
// Start 启动cache服务
func (s *server) Start() error {
s.mu.Lock()
if s.status == true {
s.mu.Unlock()
return fmt.Errorf("server already started")
}
// -----------------启动服务----------------------
// 1. 设置status为true 表示服务器已在运行
// 2. 初始化stop channal,这用于通知registry stop keep alive
// 3. 初始化tcp socket并开始监听
// 4. 注册rpc服务至grpc 这样grpc收到request可以分发给server处理
// 5. 将自己的服务名/Host地址注册至etcd 这样client可以通过etcd
// 获取服务Host地址 从而进行通信。这样的好处是client只需知道服务名
// 以及etcd的Host即可获取对应服务IP 无需写死至client代码中
// ----------------------------------------------
s.status = true
s.stopSignal = make(chan error)
port := strings.Split(s.addr, ":")[1]
lis, err := net.Listen("tcp", ":"+port)
if err != nil {
return fmt.Errorf("failed to listen: %v", err)
}
grpcServer := grpc.NewServer()
pb.RegisterPeanutCacheServer(grpcServer, s)
// 注册服务至etcd
go func() {
// Register never return unless stop singnal received
err := registry.Register("peanutcache", s.addr, s.stopSignal)
if err != nil {
log.Fatalf(err.Error())
}
// Close channel
close(s.stopSignal)
// Close tcp listen
err = lis.Close()
if err != nil {
log.Fatalf(err.Error())
}
log.Printf("[%s] Revoke service and close tcp socket ok.", s.addr)
}()
//log.Printf("[%s] register service ok\n", s.addr)
s.mu.Unlock()
if err := grpcServer.Serve(lis); s.status && err != nil {
return fmt.Errorf("failed to serve: %v", err)
}
return nil
}
// SetPeers 将各个远端主机IP配置到Server里
// 这样Server就可以Pick他们了
// 注意: 此操作是*覆写*操作!
// 注意: peersIP必须满足 x.x.x.x:port的格式
func (s *server) SetPeers(peersAddr ...string) {
s.mu.Lock()
defer s.mu.Unlock()
s.consHash = consistenthash.New(defaultReplicas, nil)
s.consHash.Register(peersAddr...)
s.clients = make(map[string]*client)
for _, peerAddr := range peersAddr {
if !validPeerAddr(peerAddr) {
panic(fmt.Sprintf("[peer %s] invalid address format, it should be x.x.x.x:port", peerAddr))
}
service := fmt.Sprintf("peanutcache/%s", peerAddr)
s.clients[peerAddr] = NewClient(service)
}
}
// Pick 根据一致性哈希选举出key应存放在的cache
// return false 代表从本地获取cache
func (s *server) Pick(key string) (Fetcher, bool) {
s.mu.Lock()
defer s.mu.Unlock()
peerAddr := s.consHash.GetPeer(key)
// Pick itself
if peerAddr == s.addr {
log.Printf("ooh! pick myself, I am %s\n", s.addr)
return nil, false
}
log.Printf("[cache %s] pick remote peer: %s\n", s.addr, peerAddr)
return s.clients[peerAddr], true
}
// Stop 停止server运行 如果server没有运行 这将是一个no-op
func (s *server) Stop() {
s.mu.Lock()
if s.status == false {
s.mu.Unlock()
return
}
s.stopSignal <- nil // 发送停止keepalive信号
s.status = false // 设置server运行状态为stop
s.clients = nil // 清空一致性哈希信息 有助于垃圾回收
s.consHash = nil
s.mu.Unlock()
}
// 测试Server是否实现了Picker接口
var _ Picker = (*server)(nil)