Skip to content

Commit

Permalink
mqtt keepalive improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
phoddie committed Feb 1, 2024
1 parent 3413a62 commit b461d09
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 47 deletions.
41 changes: 23 additions & 18 deletions examples/io/tcp/mqttclient/mqttclient.js
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ class MQTTClient {
socket.format = BufferFormat;

if ("connected" !== this.#state) {
const remaining = this.#options.remaining, byteLength = data.byteLength;
if (("publishing" !== this.#state) || options || (byteLength > remaining))
const byteLength = data.byteLength;
if (("publishing" !== this.#state) || options || (byteLength > this.#options.remaining))
throw new Error;

socket.write(data);
Expand All @@ -143,7 +143,6 @@ class MQTTClient {
if (0 === this.#options.remaining) {
delete this.#options.remaining;
this.#state = "connected";
this.#options.last = Date.now();
}

return (this.#writable > Overhead) ? (this.#writable - Overhead) : 0;
Expand Down Expand Up @@ -247,8 +246,12 @@ class MQTTClient {
this.#writable -= length;
} break;

case MQTTClient.DISCONNECT:

case MQTTClient.PINGREQ:
if (this.#options.keepalive)
this.#options.keepalive.write = Date.now();
// fall through
case MQTTClient.DISCONNECT:
if (2 > this.#writable)
throw new Error("overflow");

Expand All @@ -260,9 +263,6 @@ class MQTTClient {
throw new Error("unknown");
}

if ("connected" === this.#state)
this.#options.last = Date.now();

return (this.#writable > Overhead) ? (this.#writable - Overhead) : 0;
}
read(count = this.#payload) {
Expand Down Expand Up @@ -297,7 +297,7 @@ class MQTTClient {
this.#options.timer ??= Timer.set(() => {
delete this.#options.timer;
if (this.#readable > 0)
this.#onReadable(this.#readable); // this results in this.#options.last being advanced inaccurately
this.#onReadable(this.#readable);
});
}

Expand Down Expand Up @@ -614,7 +614,7 @@ class MQTTClient {
Timer.clear(options.connecting);
delete options.connecting;

const keepalive = Math.round(options.keepalive / 1000);
const keepalive = Math.ceil(options.keepalive / 1000);
const id = makeStringBuffer(options.id);
const user = makeStringBuffer(options.user);
const password = makeStringBuffer(options.password);
Expand Down Expand Up @@ -664,7 +664,7 @@ class MQTTClient {
if (keepalive) {
options.keepalive = Timer.repeat(() => this.#keepalive(), keepalive * 250);
options.keepalive.interval = keepalive * 1000;
options.last = Date.now();
options.keepalive.read = options.keepalive.write = Date.now();
}

this.#state = "login";
Expand Down Expand Up @@ -692,6 +692,10 @@ class MQTTClient {
#parsed(msg) {
const operation = msg.operation;
// traceOperation(false, operation);

if ((operation == MQTTClient.PINGRESP) && this.#options.keepalive)
this.#options.keepalive.read = Date.now();

if (MQTTClient.CONNACK === operation) {
if (msg.returnCode)
return void this.#onError("connection rejected")
Expand Down Expand Up @@ -725,17 +729,18 @@ class MQTTClient {
return true;
}
#keepalive() {
const options = this.#options;
const interval = options.keepalive.interval;
const options = this.#options, keepalive = options.keepalive, interval = keepalive.interval;
const now = Date.now();
if ((options.last + (interval >> 1)) > now)
return; // wrote control packet within the keepalive interval

if ((options.last + (interval + (interval >> 1))) < now)
return void this.#onError("time out"); // no response in too long
if ((now - keepalive.read) >= (keepalive.interval + (keepalive.interval >> 1)))
return void this.#onError("time out"); // no control packet received in 1.5x keepalive interval (expected PINGRESP)

if ((now - keepalive.write) < (((keepalive.interval >> 2) * 3) - 500))
return;

for (let i = 0, queue = this.#options.pending, length = queue.length; i < length; i++) {
if (queue[i].keepalive && (MQTTClient.PINGREQ === queue[i].operation))
// haven't sent a ping in (just under) 3/4 the keep alive interval
for (let i = 0, pending = options.pending, length = pending.length; i < length; i++) {
if (pending[i].keepalive && (MQTTClient.PINGREQ === pending[i].operation))
return void this.#onError("time out"); // unsent keepalive ping, exit
}

Expand Down
58 changes: 29 additions & 29 deletions modules/network/mqtt/mqtt.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2023 Moddable Tech, Inc.
* Copyright (c) 2016-2024 Moddable Tech, Inc.
*
* This file is part of the Moddable SDK Runtime.
*
Expand Down Expand Up @@ -47,7 +47,7 @@ const SUBSCRIBE = 0x80;
const UNSUBSCRIBE = 0xA0;

export default class Client {
#timer;
#timer; // keepalive
#messages;

constructor(dictionary) {
Expand Down Expand Up @@ -200,6 +200,21 @@ export default class Client {
case 1:
parse.state = buffer[position] & 0x80 ? 2 : parse.code;
parse.length = buffer[position++] & 0x7F;
if ((2 !== parse.state) && !parse.length) {
if (0xC0 === parse.state) { // PINGREQ
this.ws.write(Uint8Array.of(0xD0, 0x00).buffer); // PINGRESP
}
else if (0xD0 === parse.state) { // PINGRESP
if (this.#timer)
this.#timer.read = Date.now();
}
else if (0xE0 === parse.state) { // DISCONNECT
return;
}
else
return this.fail("bad state");
parse = this.parse = {state: 0};
}
break;

case 2:
Expand Down Expand Up @@ -418,26 +433,10 @@ export default class Client {
parse = this.parse = {state: 0}; // not dispatched
break;

case 0xC0: // PINGREQ
this.ws.write(Uint8Array.of(0xD0, 0x00).buffer); // PINGRESP
parse = this.parse = {state: 0};
break;

case 0xD0: // PINGRESP (ignored)
parse = this.parse = {state: 0};
break;

case 0xE0: // DISCONNECT
debugger; //@@ mosquitto_pub doesn't transmit this?
break;

default:
return this.fail("bad parse state");
}
}

if (this.#timer)
this.last = Date.now();
}
connected() {
this.onConnected?.();
Expand Down Expand Up @@ -496,7 +495,7 @@ export default class Client {

if (timeout) {
this.#timer = Timer.repeat(this.keepalive.bind(this), this.timeout >> 2);
this.last = Date.now();
this.#timer.read = this.#timer.write = Date.now();
}

delete this.connect;
Expand Down Expand Up @@ -577,19 +576,20 @@ export default class Client {
}
keepalive() {
const now = Date.now();
if ((this.last + this.timeout) > now)
return; // received data within the timeout interval
if ((now - this.#timer.read) >= (this.timeout + (this.timeout >> 1)))
return void this.fail("read time out"); // nothing received in 1.5x keep alive interval is fail for client and server (in write-only client, it means client didn't receive ping response)

if ((this.last + (this.timeout + (this.timeout >> 1))) > now) {
try {
this.ws.write(Uint8Array.of(0xC0, 0x00).buffer); // ping
}
catch {
this.fail("write failed");
if (!this.server) { // client must send something within the keepalive interval. we always ping within that interval (ensures that write-only client occassionally receives something from server)
if ((now - this.#timer.write) >= (((this.timeout >> 2) * 3) - 500)) { // haven't sent a ping in (just under) 3/4 the keep alive interval
try {
this.#timer.write = now;
this.ws.write(Uint8Array.of(0xC0, 0x00).buffer); // ping
}
catch {
this.fail("write failed");
}
}
}
else
this.fail("time out"); // timed out after 1.5x waiting
}
fail(msg = "") {
trace("MQTT FAIL: ", msg, "\n");
Expand Down

0 comments on commit b461d09

Please sign in to comment.