-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathmock-device-for-mqtt.go
116 lines (100 loc) · 2.61 KB
/
mock-device-for-mqtt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
/*
* 依赖库:go get github.com/eclipse/paho.mqtt.golang
*/
package main
import (
"encoding/json"
MQTT "github.com/eclipse/paho.mqtt.golang"
"log"
"time"
)
const (
BROKER_HOST = "tcp://192.168.56.4:1883"
USERNAME = "huaqiao"
PWD = "1234"
CMD_TOPIC = "CommandTopic"
RESPONSE_TOPIC = "ResponseTopic"
DATA_TOPIC = "DataTopic"
PAYLOAD = "{\"name\":\"mqtt-device-01\",\"randnum\":\"520.1314\"}"
RESP_CLIENTID = "Mock-Device-Response-ID"
CLIENTID = "Mock-Device-ID"
)
var active = "false"
var msgCh = make(chan string, 1)
//var msgRecHandler MQTT.MessageHandler =
func main() {
opts := MQTT.NewClientOptions().AddBroker(BROKER_HOST)
opts.SetUsername(USERNAME)
opts.SetPassword(PWD)
opts.SetClientID(CLIENTID)
opts.OnConnect = MQTT.OnConnectHandler(onConnnect)
client := MQTT.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
log.Println("can't connect to broker.")
panic(token.Error())
}
go sendDataActiveServer(msgCh, client)
for {
time.Sleep(3 * time.Second)
}
}
func onConnnect(client MQTT.Client) {
log.Println("Connect to broker successed. ")
if t := client.Subscribe(CMD_TOPIC, 0, MQTT.MessageHandler(msgRecHandler)); t.Wait() && t.Error() != nil {
log.Println("Can't not subscribe " + CMD_TOPIC + " topic.")
panic(t.Error())
}
log.Println("Start subscribe " + CMD_TOPIC + " topic.")
}
func msgRecHandler(client MQTT.Client, msg MQTT.Message) {
log.Printf("Recv msg : %s\n", msg.Payload())
cmdMap := make(map[string]string)
json.Unmarshal(msg.Payload(), &cmdMap)
cmd := cmdMap["cmd"]
method := cmdMap["method"]
switch cmd {
case "ping":
cmdMap["ping"] = "pong"
case "randnum":
cmdMap["randnum"] = "520.1314"
case "message":
if method == "get" {
cmdMap["message"] = "Are you ok?"
} else {
cmdMap["result"] = "set successed."
}
case "collect":
if method == "get" {
cmdMap["collect"] = active
} else {
cmdMap["result"] = "set successed."
active = cmdMap["param"]
}
}
respMsg, err := json.Marshal(cmdMap)
if err != nil {
log.Println(err)
}
token := client.Publish(RESPONSE_TOPIC, 0, false, respMsg)
token.Wait()
log.Println("Response cmd : " + string(respMsg))
}
func sendDataActiveServer(ch <-chan string, client MQTT.Client) {
for {
select {
case msg, ok := <-ch:
if ok {
active = msg
}
default:
time.Sleep(100 * time.Millisecond)
}
if active == "true" {
log.Println("send data actively from mock device.")
log.Println(" " + PAYLOAD)
token := client.Publish(DATA_TOPIC, 0, false, PAYLOAD)
token.Wait()
time.Sleep(1 * time.Second)
}
}
}