-
Notifications
You must be signed in to change notification settings - Fork 1
/
pubsub.go
137 lines (113 loc) · 3.23 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
/*
Package pubsub contains utility functions for working with local broker mqtt.
*/
package pubsub
import (
"fmt"
"log"
"time"
MQTT "github.com/eclipse/paho.mqtt.golang"
)
type Message struct {
Type string `json:"type"`
Value interface{} `json:"value"`
Timestamp float64 `json:"timestamp"`
}
//this type store the status of connection
type PubSub struct {
Conn MQTT.Client
Err chan error
}
var subscriptions map[string]chan []byte
var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
// fmt.Printf("TOPIC: %s\n", msg.Topic())
// fmt.Printf("MSG: %s\n", msg.Payload())
vch, ok := subscriptions[string(msg.Topic())]
if ok {
select {
case vch <- msg.Payload():
default:
}
}
}
var onConnection MQTT.OnConnectHandler = func(c MQTT.Client) {
log.Println("OnConnection MQTT")
for k := range subscriptions {
t := c.Subscribe(k, 0, nil)
if t.WaitTimeout(3*time.Second) && t.Error() != nil {
log.Printf("error subzcription: %s", t.Error())
}
}
}
//NewConnection return the PubSub object. nameClient is the client name in local broker.
func NewConnection(nameClient string) (*PubSub, error) {
p := &PubSub{}
opts := MQTT.NewClientOptions().AddBroker("tcp://127.0.0.1:1883")
opts.SetClientID(nameClient)
opts.SetDefaultPublishHandler(f)
opts.SetOnConnectHandler(onConnection)
opts.SetAutoReconnect(true)
p.Conn = MQTT.NewClient(opts)
token := p.Conn.Connect()
ok := token.WaitTimeout(30 * time.Second)
switch {
case !ok:
return nil, fmt.Errorf("Timeout Error at the beginning of the connection")
case token.Error() != nil:
return nil, token.Error()
}
p.Err = make(chan error)
return p, nil
}
//New return the PubSub object. Without start connection, nameClient is the client name in local broker.
func New(nameClient string) *PubSub {
p := &PubSub{}
subscriptions = make(map[string]chan []byte)
opts := MQTT.NewClientOptions().AddBroker("tcp://127.0.0.1:1883")
opts.SetClientID(nameClient)
opts.SetDefaultPublishHandler(f)
opts.SetOnConnectHandler(onConnection)
opts.SetAutoReconnect(true)
p.Conn = MQTT.NewClient(opts)
return p
}
//Start connection
func (p *PubSub) Start() error {
token := p.Conn.Connect()
ok := token.WaitTimeout(30 * time.Second)
switch {
case !ok:
return fmt.Errorf("Timeout Error at the beginning of the connection")
case token.Error() != nil:
return token.Error()
}
p.Err = make(chan error)
return nil
}
//AddSubscription add new subcription before Connection
func (p *PubSub) AddSubscription(topic string, ch chan []byte) {
subscriptions[topic] = ch
}
//Publish should be executed with a Go routine. The channel obtains the content that must be sent to the local broker.
func (p *PubSub) Publish(topic string, ch <-chan string) {
if p.Conn == nil {
p.Err <- fmt.Errorf("Nil Connection Error, execute NewConnection()")
return
}
for msg := range ch {
if msg == "EOF" {
log.Println("FINISH publish function")
return
}
token := p.Conn.Publish(topic, 0, false, msg)
if ok := token.WaitTimeout(10 * time.Second); !ok {
p.Err <- fmt.Errorf("timeout Error in publish")
}
// log.Printf("TOPIC: %s; message: %s\n", topic, msg)
}
}
//Disconnect and close error channel
func (p *PubSub) Disconnect() {
close(p.Err)
p.Conn.Disconnect(250)
}