-
Notifications
You must be signed in to change notification settings - Fork 0
/
mqtt.js
executable file
·140 lines (124 loc) · 4.96 KB
/
mqtt.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
/*
* http://www.steves-internet-guide.com/using-node-mqtt-client/
* https://github.com/mqttjs/async-mqtt
*
* Configure Tasmota with:
Backlog mqtthost 10.20.30.40; mqttport 1883; mqttuser <username>; mqttpassword <password>; topic <device_topic>;
*/
const mqtt = require('async-mqtt')
const winston = require('winston')
const { v4: uuidv4 } = require('uuid')
module.exports = function(config, god) {
var self = {
logger: {},
client: {},
triggers: {},
init: function() {
this.logger = winston.loggers.get('mqtt')
this.client = mqtt.connect(config.server, { clientId: config.clientId } )
this.logger.info("Connecting to mqtt server %s as %s", config.server, config.clientId)
this.publish = this.client.publish.bind(this.client)
this.client.on("error", async (error) => {
this.logger.error("Can't connect" + error)
// TODO Steve says this only happens on auth failures and they are non-recoverable - other errors don't trigger this callback
})
this.client.on("connect", async () => {
this.logger.info("Connected " + this.client.connected)
// this.client.subscribe('#') // for debugging or finding new messages - warning: breaks retained message handling
})
this.client.on('message', this._onMessage.bind(this))
god.terminateListeners.push(this.onTerminate.bind(this))
},
onTerminate: async function() {
this.logger.debug('Closing connection')
await this.client.end()
},
_onMessage: async function(topic, message, packet) {
let topic2 = topic
let loop = true
let found = false
this.logger.silly("MQTT raw packet: %o", packet)
while(loop) {
let trigger = this.triggers[topic2]
this.logger.silly("Known triggers for topic %s: \n%o", topic2, trigger)
if (trigger) {
found = true
let keys = Object.keys(trigger)
if (keys.length == 0) {
this.logger.info('Trigger found for %s, but no callbacks defined', topic2)
} else {
for(let i=0; i < keys.length; i++) {
let t = trigger[keys[i]]
if (!t) {
this.logger.error("Known triggers for topic %s: \n%o\n%s keys: %o", topic2, trigger, keys.length, keys)
this.logger.error("Thinking of it, triggers are:\n%o\nwith %s keys: %o", this.triggers[topic2], Object.keys(this.triggers[topic2]).length, Object.keys(this.triggers[topic2]))
this.logger.error("Couldn't find trigger for keys[%s]=%s", i, keys[i])
} else {
this.logger.info(t.id + ": " + message.toString())
await t.callback(t, topic, message, packet)
}
}
}
}
// go one level more generic
if (topic2.indexOf('/') > 0) {
topic2 = topic2.replace(/(^|\/)[^/#]+(\/#)?$/, '/#')
} else {
loop = false
}
}
if (!found) {
// unrecognized mqtt message
this.logger.debug("unrecognized: " + topic + " -> " + message.toString().substr(0, 200))
return
}
},
/** adds a MQTT topic trigger
* topic: the MQTT topic.
* id: ID which will be passed to the callback (as trigger.id)
* callback: function(trigger, topic, message, packet)
* returns the trigger uuid, which can be used to remove the trigger again
*/
addTrigger: async function(topic, id, callback) {
let subscribe = false
if (!this.triggers[topic]) {
this.triggers[topic] = {}
this.logger.info("Subscribing to %s", topic)
subscribe = true
}
let uuid = uuidv4()
this.triggers[topic][uuid] = {
uuid: uuid,
id: id,
callback: callback,
}
this.logger.debug("Adding trigger %s (%s) to subscription for %s", id, uuid, topic)
if (subscribe) {
await this.client.subscribe(topic)
}
return uuid
},
removeTrigger: async function(topic, uuid) {
if (!this.triggers[topic]) {
this.logger.warn("Trying to remove trigger %s, but no active subscription for topic %s", uuid, topic)
return
}
if (!this.triggers[topic][uuid]) {
this.logger.warn("Trying to remove trigger %s for %s, but trigger not found", uuid, topic)
return
}
this.logger.debug("Removing trigger '%s' (%s) from subscription for %s", this.triggers[topic][uuid].id, uuid, topic)
delete this.triggers[topic][uuid]
if (!Object.keys(this.triggers[topic]).length) {
this.logger.info("Unsubscribing from " + topic)
await this.client.unsubscribe(topic)
delete this.triggers[topic]
}
},
publish: async(topic) => { // gets overwritten with this.client.publish(topic, message) in init()
this.logger.error("Trying to publish to topic %s before mqtt was initialized", topic)
},
}
self.init()
return self
}