-
Notifications
You must be signed in to change notification settings - Fork 0
/
mqttsub.go
132 lines (130 loc) · 3.33 KB
/
mqttsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package main
import (
"github.com/godaner/brokerc/broker"
"github.com/godaner/brokerc/broker/mqttv1"
"github.com/urfave/cli"
"os"
"os/signal"
)
var MQTTSubscribeCommand = cli.Command{
Name: "mqttsub",
Usage: "subscribe mqtt message",
UsageText: "Usage: brokerc mqttsub [options...] <uri>, uri arg format: mqtt[s]://[username][:password]@host.domain[:port]",
Flags: []cli.Flag{
cli.StringFlag{
Name: "t",
Usage: "topic.",
Required: true,
},
cli.StringFlag{
Name: "i",
Usage: "client id.",
Required: false,
},
cli.BoolFlag{
Name: "d",
Usage: "debug.",
Required: false,
},
cli.IntFlag{
Name: "q",
Usage: "quality of service level to use for all messages. Defaults to 0.",
Required: false,
},
cli.BoolFlag{
Name: "c",
Usage: "disable 'clean session' (store subscription and pending messages when client disconnects).",
Required: false,
},
cli.StringFlag{
Name: "cafile",
Usage: "path to a file containing trusted CA certificates to enable encrypted communication.",
Required: false,
},
cli.StringFlag{
Name: "cert",
Usage: "client certificate for authentication, if required by server.",
Required: false,
},
cli.StringFlag{
Name: "key",
Usage: "client private key for authentication, if required by server.",
Required: false,
},
cli.BoolFlag{
Name: "insecure",
Usage: "do not check that the server certificate hostname matches the remote hostname.",
Required: false,
},
cli.StringFlag{
Name: "will-payload",
Usage: "payload for the client Will, which is sent by the broker in case of unexpected disconnection.",
Required: false,
},
cli.StringFlag{
Name: "will-topic",
Usage: "the topic on which to publish the client Will.",
Required: false,
},
cli.BoolFlag{
Name: "will-retain",
Usage: "if given, make the client Will retained.",
Required: false,
},
cli.StringFlag{
Name: "will-qos",
Usage: "QoS level for the client Will.",
Required: false,
},
},
Action: func(context *cli.Context) error {
uri := context.Args().Get(0)
i, t, d, q, c, wt, wp, wr, wq, cafile, cert, key, insecure :=
context.String("i"),
context.String("t"),
context.Bool("d"),
context.Int("q"),
context.Bool("c"),
context.String("will-topic"),
context.String("will-payload"),
context.Bool("will-retain"),
context.Int("will-qos"),
context.String("cafile"),
context.String("cert"),
context.String("key"),
context.Bool("insecure")
logger.SetDebug(d)
b := mqttv1.MQTTBrokerV1{
URI: uri,
CID: i,
WT: wt,
WP: wp,
WR: wr,
WQ: byte(wq),
C: c,
CACertFile: cafile,
CertFile: cert,
KeyFile: key,
Insecure: insecure,
Logger: logger,
Debug: d,
}
err := b.Connect()
if err != nil {
return err
}
defer b.Disconnect()
s, err := b.Subscribe([]string{t}, func(event broker.Event) error {
logger.Infof("SUBSCRIBE=> uri:%v, i:%v, t:%v, q:%v, c:%v, m:%v !", uri, i, t, q, c, string(event.Message().Body))
return nil
}, broker.SetSubQOS(q))
if err != nil {
return err
}
defer s.Unsubscribe()
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, os.Kill)
<-sig
return nil
},
}