-
Notifications
You must be signed in to change notification settings - Fork 4
/
router_persistent.go
143 lines (130 loc) · 3.07 KB
/
router_persistent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
// Copyright 2018 Blockchain-CN . All rights reserved.
// https://github.com/Blockchain-CN
package pheromones
import (
"fmt"
"io"
"net"
"sync"
"time"
)
// 长连接对象
type endPointP struct {
c net.Conn
}
// PRouter 长连接路由
type PRouter struct {
sync.RWMutex
sync.WaitGroup
to time.Duration
// 长链接池
Pool map[string]endPointP
}
// NewPRouter 创建长连接路由
func NewPRouter(to time.Duration) *PRouter {
var r PRouter
r.to = to
r.Pool = make(map[string]endPointP, 0)
return &r
}
// AddRoute 添加路由时,已添加或者地址为空是都返回有错误,防止收到请求和主动连接重复建立
// 如果名字相同且连接符不同,则将原来的地址删除
func (r *PRouter) AddRoute(name string, addr interface{}) error {
if _, ok := addr.(net.Conn); !ok {
return Error(ErrRemoteSocketMisType)
}
if addr.(net.Conn) == nil {
return Error(ErrRemoteSocketEmpty)
}
if _, ok := r.Pool[name]; ok {
if addr.(net.Conn) == r.Pool[name].c {
return Error(ErrRemoteSocketExist)
}
r.Delete(name)
}
r.Lock()
r.Pool[name] = endPointP{addr.(net.Conn)}
r.Unlock()
fmt.Printf("添加路由, peername=@%s@||peeraddress=%s\n", name, addr.(net.Conn).RemoteAddr())
return nil
}
// Delete 删除某个peer
func (r *PRouter) Delete(name string) error {
r.Lock()
defer r.Unlock()
if _, ok := r.Pool[name]; !ok {
return Error(ErrRemoteSocketEmpty)
}
r.Pool[name].c.Close()
delete(r.Pool, name)
return nil
}
// GetConnType 获取连接类型
func (r *PRouter) GetConnType() ConnType {
return PersistentConnection
}
// DispatchAll 广播消息
func (r *PRouter) DispatchAll(msg []byte) map[string][]byte {
for k, v := range r.Pool {
r.Add(1)
go func(name string, c net.Conn) {
defer r.Done()
defer func() {
if err := recover(); err != nil {
fmt.Printf("panic: %v", err)
}
}()
fmt.Printf("dispatchall||发送请求, peername=%s||peeraddr=%s||msg=%s\n", name, c.RemoteAddr(), string(msg))
r.RLock()
c.SetWriteDeadline(time.Now().Add(r.to))
_, err := c.Write(msg)
r.RUnlock()
if err != nil {
r.Delete(name)
}
}(k, v.c)
}
r.Wait()
return nil
}
// 获取全部对象
func (r *PRouter) FetchPeers() map[string]interface{} {
p2 := make(map[string]interface{})
r.RLock()
defer r.RUnlock()
for k, v := range r.Pool {
p2[k] = v
}
return p2
}
// Dispatch 单点传输
func (r *PRouter) Dispatch(name string, msg []byte) ([]byte, error) {
r.RLock()
if _, ok := r.Pool[name]; !ok {
return nil, Error(ErrUnknuowPeer)
}
fmt.Printf("发送请求, peername=%s||msg=%s\n", name, string(msg))
r.Pool[name].c.SetWriteDeadline(time.Now().Add(r.to))
_, err := r.Pool[name].c.Write(msg)
r.RUnlock()
if err != nil {
r.Delete(name)
}
return nil, err
}
func (r *PRouter) read(io io.Reader, to time.Duration) ([]byte, error) {
buf := make([]byte, defultByte)
messnager := make(chan int)
go func() {
n, _ := io.Read(buf[:])
messnager <- n
close(messnager)
}()
select {
case n := <-messnager:
return buf[:n], nil
case <-time.After(to):
return nil, Error(ErrLocalSocketTimeout)
}
return buf, nil
}