Skip to content

Commit

Permalink
feat: emit Keepalive timeout error and speed up tests using fake ti…
Browse files Browse the repository at this point in the history
…mers (#1798)

* chore: speed up tests using fake timers

* fix: typo

* fix: improve some other tests

* fix: improve flush test

* fix: remove clock
  • Loading branch information
robertsLando authored Feb 26, 2024
1 parent e212fa7 commit 5d9bf10
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 51 deletions.
1 change: 1 addition & 0 deletions src/lib/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2100,6 +2100,7 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
this._sendPacket({ cmd: 'pingreq' })
} else {
// do a forced cleanup since socket will be in bad shape
this.emit('error', new Error('Keepalive timeout'))
this.log('_checkPing :: calling _cleanUp with force true')
this._cleanUp(true)
}
Expand Down
140 changes: 112 additions & 28 deletions test/abstract_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { IPublishPacket, IPubrelPacket, ISubackPacket, QoS } from 'mqtt-packet'
import { DoneCallback, ErrorWithReasonCode } from 'src/lib/shared'
import { fail } from 'assert'
import { describe, it, beforeEach, afterEach } from 'node:test'
import { nextTick } from 'process'

/**
* These tests try to be consistent with names for servers (brokers) and clients,
Expand Down Expand Up @@ -1309,7 +1310,7 @@ export default function abstractTest(server, config, ports) {
})
}
callback()
}, 100)
}, 10)
}

client.on('message', (topic, message, packet) => {
Expand Down Expand Up @@ -1342,7 +1343,7 @@ export default function abstractTest(server, config, ports) {

const qosTests = [0, 1, 2]
qosTests.forEach((qos) => {
it(`should publish 10 QoS ${qos}and receive them only when \`handleMessage\` finishes`, function _test(t, done) {
it(`should publish 10 QoS ${qos} and receive them only when \`handleMessage\` finishes`, function _test(t, done) {
testQosHandleMessage(qos, done)
})
})
Expand Down Expand Up @@ -2048,40 +2049,83 @@ export default function abstractTest(server, config, ports) {
})

it(
'should reconnect if pingresp is not sent',
'should reconnect on keepalive timeout',
{
timeout: 4000,
timeout: 10000,
},
function _test(t, done) {
const client = connect({ keepalive: 1, reconnectPeriod: 100 })
const clock = sinon.useFakeTimers()

// Fake no pingresp being send by stubbing the _handlePingresp function
client.on('packetreceive', (packet) => {
if (packet.cmd === 'pingresp') {
setImmediate(() => {
client.pingResp = false
})
t.after(() => {
clock.restore()
if (client) {
client.end(true)
throw new Error('Test timed out')
}
})

let client = connect({
keepalive: 60,
reconnectPeriod: 5000,
})

client.once('connect', () => {
client.once('connect', () => {
client.end(true, done)
client.pingResp = false

client.once('error', (err) => {
assert.equal(err.message, 'Keepalive timeout')
client.once('connect', () => {
client.end(true, done)
client = null
})
})

client.once('close', () => {
// Wait for the reconnect to happen
clock.tick(client.options.reconnectPeriod)
})

clock.tick(client.options.keepalive * 1000)
})
},
)

it('should not reconnect if pingresp is successful', function _test(t, done) {
const client = connect({ keepalive: 100 })
client.once('close', () => {
done(new Error('Client closed connection'))
})
setTimeout(() => {
client.removeAllListeners('close')
client.end(true, done)
}, 1000)
})
it(
'should not reconnect if pingresp is successful',
{ timeout: 1000 },
function _test(t, done) {
const clock = sinon.useFakeTimers()

t.after(() => {
clock.restore()
if (client) {
client.end(true)
}
})

let client = connect({ keepalive: 10 })
client.once('close', () => {
done(new Error('Client closed connection'))
})

client.once('connect', () => {
setImmediate(() => {
// make keepalive check trigger
clock.tick(client.options.keepalive * 1000)
})

client.on('packetsend', (packet) => {
if (packet.cmd === 'pingreq') {
client.removeAllListeners('close')
client.end(true, done)
client = null
}
})

clock.tick(1)
})
},
)

it('should defer the next ping when sending a control packet', function _test(t, done) {
const client = connect({ keepalive: 1 })
Expand Down Expand Up @@ -2866,13 +2910,22 @@ export default function abstractTest(server, config, ports) {
})

it('should reconnect after stream disconnect', function _test(t, done) {
const client = connect()
const clock = sinon.useFakeTimers()

t.after(() => {
clock.restore()
})

const client = connect({ reconnectPeriod: 1000 })

let tryReconnect = true

client.on('connect', () => {
if (tryReconnect) {
client.stream.end()
client.once('close', () => {
clock.tick(client.options.reconnectPeriod)
})
tryReconnect = false
} else {
client.end(true, done)
Expand All @@ -2881,7 +2934,15 @@ export default function abstractTest(server, config, ports) {
})

it("should emit 'reconnect' when reconnecting", function _test(t, done) {
const client = connect()
const clock = sinon.useFakeTimers()

t.after(() => {
clock.restore()
})

const client = connect({
reconnectPeriod: 1000,
})
let tryReconnect = true
let reconnectEvent = false

Expand All @@ -2892,6 +2953,9 @@ export default function abstractTest(server, config, ports) {
client.on('connect', () => {
if (tryReconnect) {
client.stream.end()
client.once('close', () => {
clock.tick(client.options.reconnectPeriod)
})
tryReconnect = false
} else {
assert.isTrue(reconnectEvent)
Expand All @@ -2901,7 +2965,14 @@ export default function abstractTest(server, config, ports) {
})

it("should emit 'offline' after going offline", function _test(t, done) {
const client = connect()
const clock = sinon.useFakeTimers()

t.after(() => {
clock.restore()
})
const client = connect({
reconnectPeriod: 1000,
})

let tryReconnect = true
let offlineEvent = false
Expand All @@ -2914,6 +2985,9 @@ export default function abstractTest(server, config, ports) {
if (tryReconnect) {
client.stream.end()
tryReconnect = false
client.once('close', () => {
clock.tick(client.options.reconnectPeriod)
})
} else {
assert.isTrue(offlineEvent)
client.end(true, done)
Expand Down Expand Up @@ -2956,18 +3030,28 @@ export default function abstractTest(server, config, ports) {
timeout: 10000,
},
function _test(t, done) {
const clock = sinon.useFakeTimers()

t.after(() => {
clock.restore()
})

let end
const reconnectSlushTime = 200
const client = connect({ reconnectPeriod: test.period })
let reconnect = false
const start = Date.now()
const start = clock.now

client.on('connect', () => {
if (!reconnect) {
client.stream.end()
client.once('close', () => {
// ensure the tick is done after the reconnect timer is setup (on close)
clock.tick(test.period)
})
reconnect = true
} else {
end = Date.now()
end = clock.now
client.end(() => {
const reconnectPeriodDuringTest = end - start
if (
Expand Down
63 changes: 40 additions & 23 deletions test/client.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { useFakeTimers } from 'sinon'
import mqtt from '../src'
import { assert } from 'chai'
import { fork } from 'child_process'
Expand Down Expand Up @@ -181,34 +182,44 @@ describe('MqttClient', () => {
host: 'localhost',
keepalive: 1,
connectTimeout: 350,
reconnectPeriod: 0,
reconnectPeriod: 0, // disable reconnect
})
client.once('connect', () => {
client.publish(
'fakeTopic',
'fakeMessage',
{ qos: 1 },
(err) => {
// connection closed
assert.exists(err)
pubCallbackCalled = true
},
)
client.unsubscribe('fakeTopic', (err, result) => {
// connection closed
assert.exists(err)
unsubscribeCallbackCalled = true
})
setTimeout(() => {
client.end((err1) => {
assert.strictEqual(
pubCallbackCalled && unsubscribeCallbackCalled,
true,
'callbacks not invoked',
)
server2.close((err2) => {
done(err1 || err2)

client.once('error', (err) => {
assert.equal(err.message, 'Keepalive timeout')
const originalFLush = client['_flush']
// flush will be called on _cleanUp because of keepalive timeout
client['_flush'] = function _flush() {
originalFLush.call(client)
client.end((err1) => {
assert.strictEqual(
pubCallbackCalled &&
unsubscribeCallbackCalled,
true,
'callbacks should be invoked with error',
)
server2.close((err2) => {
done(err1 || err2)
})
})
})
}, 5000)
}
})
})
},
)
Expand All @@ -218,7 +229,7 @@ describe('MqttClient', () => {
it(
'should attempt to reconnect once server is down',
{
timeout: 30000,
timeout: 5000,
},
function _test(t, done) {
const args = ['-r', 'ts-node/register']
Expand Down Expand Up @@ -344,7 +355,7 @@ describe('MqttClient', () => {
it(
'should not keep requeueing the first message when offline',
{
timeout: 2500,
timeout: 1000,
},
function _test(t, done) {
const server2 = serverBuilder('mqtt').listen(ports.PORTAND45)
Expand All @@ -365,16 +376,22 @@ describe('MqttClient', () => {
})
})

setTimeout(() => {
if (client.queue.length === 0) {
debug('calling final client.end()')
client.end(true, (err) => done(err))
} else {
debug('calling client.end()')
// Do not call done. We want to trigger a reconnect here.
client.end(true)
let reconnections = 0

client.on('reconnect', () => {
reconnections++
if (reconnections === 2) {
if (client.queue.length === 0) {
debug('calling final client.end()')
client.end(true, (err) => done(err))
} else {
debug('calling client.end()')
// Do not call done. We want to trigger a reconnect here.
client.end(true)
done(Error('client queue not empty'))
}
}
}, 2000)
})
},
)

Expand Down

0 comments on commit 5d9bf10

Please sign in to comment.