Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds exponential backoff to MQTT #563

Merged
merged 4 commits into from
Feb 15, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 64 additions & 27 deletions iot/mqtt_example/cloudiot_mqtt_example_nodejs.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,21 @@ const jwt = require('jsonwebtoken');
const mqtt = require('mqtt');
// [END iot_mqtt_include]

// The initial backoff time after a disconnection occurs, in seconds.
var MINIMUM_BACKOFF_TIME = 1;

// The maximum backoff time before giving up, in seconds.
var MAXIMUM_BACKOFF_TIME = 32;

// Whether to wait with exponential backoff before publishing.
var shouldBackoff = false;

// The current backoff time.
var backoffTime = 1;

// Whether an asynchronous publish chain is in progress.
var publishChainInProgress = false;

console.log('Google Cloud IoT Core MQTT example.');
var argv = require(`yargs`)
.options({
Expand Down Expand Up @@ -119,21 +134,45 @@ function createJwt (projectId, privateKeyFile, algorithm) {
// [END iot_mqtt_jwt]

// Publish numMessages messages asynchronously, starting from message
// messageCount.
// messagesSent.
// [START iot_mqtt_publish]
function publishAsync (messageCount, numMessages) {
const payload = `${argv.registryId}/${argv.deviceId}-payload-${messageCount}`;
// Publish "payload" to the MQTT topic. qos=1 means at least once delivery.
// Cloud IoT Core also supports qos=0 for at most once delivery.
console.log('Publishing message:', payload);
client.publish(mqttTopic, payload, { qos: 1 });

const delayMs = argv.messageType === 'events' ? 1000 : 2000;
if (messageCount < numMessages) {
// If we have published fewer than numMessage messages, publish payload
// messageCount + 1 in 1 second.
// [START iot_mqtt_jwt_refresh]
function publishAsync (messagesSent, numMessages) {
// If we have published enough messages or backed off too many times, stop.
if (messagesSent > numMessages || backoffTime >= MAXIMUM_BACKOFF_TIME) {
if (backoffTime >= MAXIMUM_BACKOFF_TIME) {
console.log('Backoff time is too high. Giving up.');
}
console.log('Closing connection to MQTT. Goodbye!');
client.end();
publishChainInProgress = false;
return;
}

// Publish and schedule the next publish.
publishChainInProgress = true;
var publishDelayMs = 0;
if (shouldBackoff) {
publishDelayMs = 1000 * (backoffTime + Math.random());
backoffTime *= 2;
console.log(`Backing off for ${publishDelayMs}ms before publishing.`);
}

setTimeout(function () {
const payload = `${argv.registryId}/${argv.deviceId}-payload-${messagesSent}`;

// Publish "payload" to the MQTT topic. qos=1 means at least once delivery.
// Cloud IoT Core also supports qos=0 for at most once delivery.
console.log('Publishing message:', payload);
client.publish(mqttTopic, payload, { qos: 1 }, function (err) {
if (!err) {
shouldBackoff = false;
backoffTime = MINIMUM_BACKOFF_TIME;
}
});

var schedulePublishDelayMs = argv.messageType === 'events' ? 1000 : 2000;
setTimeout(function () {
// [START iot_mqtt_jwt_refresh]
let secsFromIssue = parseInt(Date.now() / 1000) - iatTime;
if (secsFromIssue > argv.tokenExpMins * 60) {
iatTime = parseInt(Date.now() / 1000);
Expand All @@ -145,15 +184,16 @@ function publishAsync (messageCount, numMessages) {

client.on('connect', (success) => {
console.log('connect');
if (success) {
publishAsync(1, argv.numMessages);
} else {
if (!success) {
console.log('Client not connected...');
} else if (!publishChainInProgress) {
publishAsync(1, argv.numMessages);
}
});

client.on('close', () => {
console.log('close');
shouldBackoff = true;
});

client.on('error', (err) => {
Expand All @@ -168,14 +208,10 @@ function publishAsync (messageCount, numMessages) {
// Note: logging packet send is very verbose
});
}
publishAsync(messageCount + 1, numMessages);
}, delayMs);
// [END iot_mqtt_jwt_refresh]
} else {
// Otherwise, close the connection.
console.log('Closing connection to MQTT. Goodbye!');
client.end();
}
// [END iot_mqtt_jwt_refresh]
publishAsync(messagesSent + 1, numMessages);
}, schedulePublishDelayMs);
}, publishDelayMs);
}
// [END iot_mqtt_publish]

Expand Down Expand Up @@ -213,15 +249,16 @@ const mqttTopic = `/devices/${argv.deviceId}/${argv.messageType}`;

client.on('connect', (success) => {
console.log('connect');
if (success) {
publishAsync(1, argv.numMessages);
} else {
if (!success) {
console.log('Client not connected...');
} else if (!publishChainInProgress) {
publishAsync(1, argv.numMessages);
}
});

client.on('close', () => {
console.log('close');
shouldBackoff = true;
});

client.on('error', (err) => {
Expand Down