Skip to content

Commit

Permalink
fix(mqtt): Attempt to mitigate problems related to connectivity issues
Browse files Browse the repository at this point in the history
  • Loading branch information
Hypfer committed Nov 15, 2021
1 parent c06d370 commit cf0f2d6
Showing 1 changed file with 62 additions and 15 deletions.
77 changes: 62 additions & 15 deletions backend/lib/mqtt/MqttController.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class MqttController {
this.config = options.config;
this.robot = options.robot;

this.semaphores = {
this.mutexes = {
reconfigure: Semaphore(1)
};

Expand All @@ -44,6 +44,8 @@ class MqttController {
/** @public */
this.homieAddICBINVMapProperty = false;

this.mqttClientCloseEventHandler = async () => {};

this.loadConfig();

this.robot.onMapUpdated(() => {
Expand Down Expand Up @@ -315,13 +317,38 @@ class MqttController {

await this.shutdown();
await this.connect();
})().then();
})().then().catch(e => {
Logger.error("Error while handling mqtt client error reconnect", e);
});
}
}
});

this.client.on("reconnect", () => {
Logger.info("Attempting to reconnect to MQTT broker");
});

this.mqttClientCloseEventHandler = async () => {
await this.handleUncleanDisconnect();
};

this.client.on("close", this.mqttClientCloseEventHandler);
}
}

/**
* @private
* @return {Promise<void>}
*/
async handleUncleanDisconnect() {
if (this.state === HomieCommonAttributes.STATE.READY) {
Logger.info("Connection to MQTT broker closed");
}

this.stopAutorefreshService();
await this.setState(HomieCommonAttributes.STATE.LOST);
}

/**
* @private
* Disconnects MQTT client
Expand All @@ -331,7 +358,8 @@ class MqttController {
return;
}

Logger.debug("Disconnecting MQTT Client...");
Logger.info("Disconnecting from the MQTT Broker...");
this.client.removeListener("close", this.mqttClientCloseEventHandler);

const closePromise = new Promise(((resolve, reject) => {
this.client.on("close", (err) => {
Expand All @@ -351,7 +379,7 @@ class MqttController {

this.client = null;
this.asyncClient = null;
Logger.debug("Disconnecting the MQTT Client done");
Logger.info("Successfully disconnected from the MQTT Broker");
}

/**
Expand Down Expand Up @@ -456,7 +484,7 @@ class MqttController {
*/
reconfigure(cb, options) {
return new Promise((resolve, reject) => {
this.semaphores.reconfigure.take(async () => {
this.mutexes.reconfigure.take(async () => {
const reconfOptions = {
reconfigState: HomieCommonAttributes.STATE.INIT,
targetState: HomieCommonAttributes.STATE.READY,
Expand All @@ -472,13 +500,13 @@ class MqttController {
await cb();
await this.setState(reconfOptions.targetState);

this.semaphores.reconfigure.leave();
this.mutexes.reconfigure.leave();
resolve();
} catch (err) {
Logger.error("MQTT reconfiguration error", err);
await this.setState(reconfOptions.errorState);

this.semaphores.reconfigure.leave();
this.mutexes.reconfigure.leave();

reject(err);
}
Expand All @@ -498,8 +526,16 @@ class MqttController {
return;
}

// @ts-ignore
await this.asyncClient.subscribe(Object.keys(topics), {qos: MqttCommonAttributes.QOS.AT_LEAST_ONCE});

try {
// @ts-ignore
await this.asyncClient.subscribe(Object.keys(topics), {qos: MqttCommonAttributes.QOS.AT_LEAST_ONCE});
} catch (e) {
if (e.message !== "client disconnecting" && e.message !== "connection closed") {
throw e;
}
}


Object.assign(this.subscriptions, topics);
}
Expand All @@ -516,7 +552,13 @@ class MqttController {
return;
}

await this.asyncClient.unsubscribe(Object.keys(topics));
try {
await this.asyncClient.unsubscribe(Object.keys(topics));
} catch (e) {
if (e.message !== "client disconnecting" && e.message !== "Connection closed") {
throw e;
}
}

for (const topic of Object.keys(topics)) {
delete this.subscriptions[topic];
Expand All @@ -541,7 +583,9 @@ class MqttController {
// @ts-ignore
await this.asyncClient.publish(handle.getBaseTopic(), value, {qos: handle.getQoS(), retain: handle.retained});
} catch (e) {
Logger.warn("MQTT publication failed, topic " + handle.getBaseTopic(), e);
if (e.message !== "client disconnecting" && e.message !== "connection closed") {
Logger.warn("MQTT publication failed, topic " + handle.getBaseTopic(), e);
}
}
}
}
Expand Down Expand Up @@ -572,9 +616,10 @@ class MqttController {
retain: true
});
} catch (err) {
Logger.warn("Failed to publish Homie attributes for handle " + handle.getBaseTopic() + ". " +
"Maybe you forgot to marshal some value?", err);
throw err;
if (err.message !== "client disconnecting" && err.message !== "connection closed") {
Logger.warn(`Failed to publish Homie attributes for handle ${handle.getBaseTopic()}. Maybe you forgot to marshal some value?`, err);
throw err;
}
}
}
}
Expand Down Expand Up @@ -668,7 +713,9 @@ class MqttController {
try {
await this.asyncClient.publish(topic, message, options);
} catch (err) {
Logger.error("MQTT publication error:", err);
if (err.message !== "client disconnecting" && err.message !== "connection closed") {
Logger.error("MQTT publication error:", err);
}
}
}
}
Expand Down

0 comments on commit cf0f2d6

Please sign in to comment.