-
Notifications
You must be signed in to change notification settings - Fork 5
/
server.go
193 lines (167 loc) · 5.04 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
package CoapPubsub
import (
"log"
"net"
"github.com/dustin/go-coap"
)
type chanMapStringList map[*net.UDPAddr][]string
type stringMapChanList map[string][]*net.UDPAddr
type CoapPubsubServer struct {
capacity int
msgIndex uint16 //for increase and sync message ID
//map to store "chan -> Topic List" for find subscription
clientMapTopics chanMapStringList
//map to store "topic -> chan List" for publish
topicMapClients stringMapChanList
}
//Create a new pubsub server using CoAP protocol
//maxChannel: It is the subpub topic limitation size, suggest not lower than 1024 for basic usage
func NewCoapPubsubServer(maxChannel int) *CoapPubsubServer {
cSev := new(CoapPubsubServer)
cSev.capacity = maxChannel
cSev.clientMapTopics = make(map[*net.UDPAddr][]string, maxChannel)
cSev.topicMapClients = make(map[string][]*net.UDPAddr, maxChannel)
cSev.msgIndex = GetIPv4Int16() + GetLocalRandomInt()
log.Println("Init msgID=", cSev.msgIndex)
return cSev
}
func (c *CoapPubsubServer) genMsgID() uint16 {
c.msgIndex = c.msgIndex + 1
return c.msgIndex
}
func (c *CoapPubsubServer) removeSubscription(topic string, client *net.UDPAddr) {
removeIndexT2C := -1
if val, exist := c.topicMapClients[topic]; exist {
for k, v := range val {
if v == client {
removeIndexT2C = k
}
}
if removeIndexT2C != -1 {
sliceClients := c.topicMapClients[topic]
if len(sliceClients) > 1 {
c.topicMapClients[topic] = append(sliceClients[:removeIndexT2C], sliceClients[removeIndexT2C+1:]...)
} else {
delete(c.topicMapClients, topic)
}
}
}
removeIndexC2T := -1
if val, exist := c.clientMapTopics[client]; exist {
for k, v := range val {
if v == topic {
removeIndexC2T = k
}
}
if removeIndexC2T != -1 {
sliceTopics := c.clientMapTopics[client]
if len(sliceTopics) > 1 {
c.clientMapTopics[client] = append(sliceTopics[:removeIndexC2T], sliceTopics[removeIndexC2T+1:]...)
} else {
delete(c.clientMapTopics, client)
}
}
}
}
func (c *CoapPubsubServer) addSubscription(topic string, client *net.UDPAddr) {
topicFound := false
if val, exist := c.topicMapClients[topic]; exist {
for _, v := range val {
if v == client {
topicFound = true
}
}
}
if topicFound == false {
c.topicMapClients[topic] = append(c.topicMapClients[topic], client)
}
clientFound := false
if val, exist := c.clientMapTopics[client]; exist {
for _, v := range val {
if v == topic {
clientFound = true
}
}
}
if clientFound == false {
c.clientMapTopics[client] = append(c.clientMapTopics[client], topic)
}
}
func (c *CoapPubsubServer) publish(l *net.UDPConn, topic string, msg string) {
if clients, exist := c.topicMapClients[topic]; !exist {
return
} else { //topic exist, publish it
for _, client := range clients {
c.publishMsg(l, client, topic, msg)
log.Println("topic->", topic, " PUB to ", client, " msg=", msg)
}
}
log.Println("pub finished")
}
func (c *CoapPubsubServer) handleCoAPMessage(l *net.UDPConn, a *net.UDPAddr, m *coap.Message) *coap.Message {
var topic string
if m.Path() != nil {
topic = m.Path()[0]
}
cmd := ParseUint8ToString(m.Option(coap.ETag))
log.Println("cmd=", cmd, " topic=", topic, " msg=", string(m.Payload))
log.Println("code=", m.Code, " option=", cmd)
if cmd == "ADDSUB" {
log.Println("add sub topic=", topic, " in client=", a)
c.addSubscription(topic, a)
c.responseOK(l, a, m)
} else if cmd == "REMSUB" {
log.Println("remove sub topic=", topic, " in client=", a)
c.removeSubscription(topic, a)
c.responseOK(l, a, m)
} else if cmd == "PUB" {
c.publish(l, topic, string(m.Payload))
c.responseOK(l, a, m)
} else if cmd == "HB" {
//For heart beat request just return OK
log.Println("Got heart beat from ", a)
c.responseOK(l, a, m)
}
for k, v := range c.topicMapClients {
log.Println("Topic=", k, " sub by client=>", v)
}
return nil
}
//ListenAndServe starts to listen udp port and serve request, until faltal eror occur
func (c *CoapPubsubServer) ListenAndServe(udpPort string) {
log.Fatal(coap.ListenAndServe("udp", udpPort,
coap.FuncHandler(func(l *net.UDPConn, a *net.UDPAddr, m *coap.Message) *coap.Message {
return c.handleCoAPMessage(l, a, m)
})))
}
func (c *CoapPubsubServer) responseOK(l *net.UDPConn, a *net.UDPAddr, m *coap.Message) {
m2 := coap.Message{
Type: coap.Acknowledgement,
Code: coap.Content,
MessageID: m.MessageID,
Payload: m.Payload,
}
m2.SetOption(coap.ContentFormat, coap.TextPlain)
m2.SetOption(coap.LocationPath, m.Path())
err := coap.Transmit(l, a, m2)
if err != nil {
log.Printf("Error on transmitter, stopping: %v", err)
return
}
}
func (c *CoapPubsubServer) publishMsg(l *net.UDPConn, a *net.UDPAddr, topic string, msg string) {
m := coap.Message{
Type: coap.Confirmable,
Code: coap.Content,
MessageID: c.genMsgID(),
Payload: []byte(msg),
}
m.SetOption(coap.ContentFormat, coap.TextPlain)
m.SetOption(coap.LocationPath, topic)
log.Printf("Transmitting %v msg=%s", m, msg)
err := coap.Transmit(l, a, m)
if err != nil {
log.Printf("Error on transmitter, stopping: %v", err)
return
}
}