From e9702d258eab289210e737c3c0cb4bb23bedff16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1ximo=20Liberata=20Torres?= Date: Fri, 19 Jul 2024 14:34:46 -0400 Subject: [PATCH 1/8] fix: tets hang up --- test/abstract_client.ts | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/test/abstract_client.ts b/test/abstract_client.ts index 218ecfa0f..57e049923 100644 --- a/test/abstract_client.ts +++ b/test/abstract_client.ts @@ -3202,15 +3202,30 @@ export default function abstractTest(server, config, ports) { }) it('should emit connack timeout error', function _test(t, done) { - const client = connect({ - connectTimeout: 0, - reconnectPeriod: 5000, + // Use fake timers to simulate the timeout. The setTimeout inside the client connection + // will inactive by other tests (maybe) causing this test never ends. + const clock = sinon.useFakeTimers({ + ...fakeTimersOptions, + toFake: ['setTimeout'], }) - client.on('error', (err) => { - assert.equal(err.message, 'connack timeout') - client.end(true, done) + const connectTimeout = 10 + + t.after(() => { + clock.restore() }) + + const client = connect({ + connectTimeout, + reconnectPeriod: 5000, + }) + .on('connect', () => { + clock.tick(connectTimeout) + }) + .on('error', (err) => { + assert.equal(err.message, 'connack timeout') + client.end(true, done) + }) }) it( From a141420a38256d5a172d8be74ea9fcae7f51c6fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1ximo=20Liberata=20Torres?= Date: Fri, 26 Jul 2024 11:37:07 -0400 Subject: [PATCH 2/8] fix(test): close open connections --- src/lib/client.ts | 5 +- src/lib/handlers/ack.ts | 14 +- test/abstract_client.ts | 386 ++++++++++++++++++++++++++++++++++------ 3 files changed, 340 insertions(+), 65 deletions(-) diff --git a/src/lib/client.ts b/src/lib/client.ts index b9fa98480..de0420137 100644 --- a/src/lib/client.ts +++ b/src/lib/client.ts @@ -385,7 +385,10 @@ export type OnMessageCallback = ( export type OnPacketCallback = (packet: Packet) => void export type OnCloseCallback = () => void export type OnErrorCallback = (error: Error | ErrorWithReasonCode) => void -export type PacketCallback = (error?: Error, packet?: Packet) => any +export type PacketCallback = ( + error?: Error | ErrorWithReasonCode, + packet?: Packet, +) => any export type CloseCallback = (error?: Error) => void export interface MqttClientEventCallbacks { diff --git a/src/lib/handlers/ack.ts b/src/lib/handlers/ack.ts index 066d30c6e..83f902966 100644 --- a/src/lib/handlers/ack.ts +++ b/src/lib/handlers/ack.ts @@ -1,6 +1,6 @@ // Other Socket Errors: EADDRINUSE, ECONNRESET, ENOTFOUND, ETIMEDOUT. -import { PacketHandler } from '../shared' +import { PacketHandler, ErrorWithReasonCode } from '../shared' export const ReasonCodes = { 0: '', @@ -82,8 +82,10 @@ const handleAck: PacketHandler = (client, packet) => { const pubackRC = packet.reasonCode // Callback - we're done if (pubackRC && pubackRC > 0 && pubackRC !== 16) { - err = new Error(`Publish error: ${ReasonCodes[pubackRC]}`) - err.code = pubackRC + err = new ErrorWithReasonCode( + `Publish error: ${ReasonCodes[pubackRC]}`, + pubackRC, + ) client['_removeOutgoingAndStoreMessage'](messageId, () => { cb(err, packet) }) @@ -102,8 +104,10 @@ const handleAck: PacketHandler = (client, packet) => { const pubrecRC = packet.reasonCode if (pubrecRC && pubrecRC > 0 && pubrecRC !== 16) { - err = new Error(`Publish error: ${ReasonCodes[pubrecRC]}`) - err.code = pubrecRC + err = new ErrorWithReasonCode( + `Publish error: ${ReasonCodes[pubrecRC]}`, + pubrecRC, + ) client['_removeOutgoingAndStoreMessage'](messageId, () => { cb(err, packet) }) diff --git a/test/abstract_client.ts b/test/abstract_client.ts index 57e049923..5f7e5ec18 100644 --- a/test/abstract_client.ts +++ b/test/abstract_client.ts @@ -20,7 +20,7 @@ import mqtt, { import { IPublishPacket, IPubrelPacket, ISubackPacket, QoS } from 'mqtt-packet' import { DoneCallback, ErrorWithReasonCode } from 'src/lib/shared' import { fail } from 'assert' -import { describe, it, beforeEach, afterEach } from 'node:test' +import { describe, it, beforeEach, afterEach, after } from 'node:test' /** * These tests try to be consistent with names for servers (brokers) and clients, @@ -47,6 +47,54 @@ const fakeTimersOptions = { shouldClearNativeTimers: true, } +class CleanBeforeMethod { + #methods: { + method: Promise | ((...args: any[]) => Promise) + args: any[] + }[] + + constructor() { + this.#methods = [] + } + + add Promise>( + method: Promise | T, + ...args: Parameters + ) { + this.#methods.push({ method, args }) + } + + async executeAll(options?: { + /** + * If `true`, all methods will be removed after execution. + */ + removeAll?: boolean + }) { + if (this.#methods.length === 0) { + return + } + + const results = await Promise.allSettled( + this.#methods.map(({ method, args }) => { + if (method instanceof Promise) { + return method + } + return method(...args) + }), + ) + + if (options?.removeAll) { + this.#methods = [] + } + + for (const result of results) { + if (result.status === 'rejected') { + Promise.reject(result.reason) + } + } + } +} + export default function abstractTest(server, config, ports) { const version = config.protocolVersion || 4 @@ -58,6 +106,10 @@ export default function abstractTest(server, config, ports) { return mqtt.connect(opts) } + const cleanBeforeMethod = new CleanBeforeMethod() + const beforeEachExec = () => + cleanBeforeMethod.executeAll({ removeAll: true }) + describe('closing', () => { it('should emit close if stream closes', function _test(t, done) { const client = connect() @@ -601,6 +653,9 @@ export default function abstractTest(server, config, ports) { }) describe('offline messages', () => { + beforeEach(beforeEachExec) + after(beforeEachExec) + it('should queue message until connected', function _test(t, done) { const client = connect() @@ -645,7 +700,25 @@ export default function abstractTest(server, config, ports) { }) it('should not interrupt messages', function _test(t, done) { - let client = null + cleanBeforeMethod.add(async () => { + if (client) { + await new Promise((resolve) => { + client.end(true, () => { + resolve() + }) + }) + } + + if (server2) { + await new Promise((resolve) => { + server2.close(() => { + resolve() + }) + }) + } + }) + + let client: mqtt.MqttClient | null = null let publishCount = 0 const incomingStore = new mqtt.Store({ clean: false }) const outgoingStore = new mqtt.Store({ clean: false }) @@ -683,9 +756,7 @@ export default function abstractTest(server, config, ports) { packet.payload.toString(), 'payload4', ) - client.end((err1) => { - server2.close((err2) => done(err1 || err2)) - }) + client.end(false, done) } }) }) @@ -715,9 +786,33 @@ export default function abstractTest(server, config, ports) { }) it('should not overtake the messages stored in the level-db-store', function _test(t, done) { + cleanBeforeMethod.add(async () => { + if (client) { + await new Promise((resolve) => { + client.end(true, () => { + resolve() + }) + }) + } + + if (server2) { + await new Promise((resolve) => { + server2.close(() => { + resolve() + }) + }) + } + + await new Promise((resolve) => { + fs.rm(storePath, { recursive: true }, () => { + resolve() + }) + }) + }) + const storePath = fs.mkdtempSync('test-store_') const store = levelStore(storePath) - let client = null + let client: mqtt.MqttClient | null = null const incomingStore = store.incoming const outgoingStore = store.outgoing let publishCount = 0 @@ -751,11 +846,7 @@ export default function abstractTest(server, config, ports) { packet.payload.toString(), 'payload3', ) - - server2.close((err) => { - fs.rmSync(storePath, { recursive: true }) - done(err) - }) + client.end(false, done) break } }) @@ -778,9 +869,7 @@ export default function abstractTest(server, config, ports) { client.once('close', () => { client.once('connect', () => { client.publish('test', 'payload2', { qos: 1 }) - client.publish('test', 'payload3', { qos: 1 }, () => { - client.end(false) - }) + client.publish('test', 'payload3', { qos: 1 }) }) // reconecting client.reconnect(clientOptions) @@ -883,6 +972,9 @@ export default function abstractTest(server, config, ports) { }) describe('publishing', () => { + beforeEach(beforeEachExec) + after(beforeEachExec) + it('should publish a message (offline)', function _test(t, done) { const client = connect() const payload = 'test' @@ -1105,10 +1197,28 @@ export default function abstractTest(server, config, ports) { }) it('should fire a callback (qos 1) on error', function _test(t, done) { + cleanBeforeMethod.add(async () => { + if (client) { + await new Promise((resolve) => { + client.end(true, () => { + resolve() + }) + }) + } + + if (server2) { + await new Promise((resolve) => { + server2.close(() => { + resolve() + }) + }) + } + }) + // 145 = Packet Identifier in use const pubackReasonCode = 145 - const pubOpts = { qos: 1 } - let client = null + const pubOpts: IClientPublishOptions = { qos: 1 } + let client: mqtt.MqttClient | null = null const server2 = serverBuilder(config.protocol, (serverClient) => { serverClient.on('connect', () => { @@ -1147,15 +1257,18 @@ export default function abstractTest(server, config, ports) { (err, packet?: mqtt.Packet) => { assert.exists(packet) if (version === 5) { - assert.strictEqual(err.code, pubackReasonCode) + if (err instanceof ErrorWithReasonCode) { + assert.strictEqual( + err.code, + pubackReasonCode, + ) + } else { + assert.instanceOf(err, ErrorWithReasonCode) + } } else { assert.ifError(err) } - setImmediate(() => { - client.end(() => { - server2.close(done) - }) - }) + done() }, ) }) @@ -1175,10 +1288,28 @@ export default function abstractTest(server, config, ports) { }) it('should fire a callback (qos 2) on error', function _test(t, done) { + cleanBeforeMethod.add(async () => { + if (client) { + await new Promise((resolve) => { + client.end(true, () => { + resolve() + }) + }) + } + + if (server2) { + await new Promise((resolve) => { + server2.close(() => { + resolve() + }) + }) + } + }) + // 145 = Packet Identifier in use const pubrecReasonCode = 145 - const pubOpts = { qos: 2 } - let client = null + const pubOpts: IClientPublishOptions = { qos: 2 } + let client: mqtt.MqttClient | null = null const server2 = serverBuilder(config.protocol, (serverClient) => { serverClient.on('connect', () => { @@ -1221,15 +1352,18 @@ export default function abstractTest(server, config, ports) { (err, packet?: mqtt.Packet) => { assert.exists(packet) if (version === 5) { - assert.strictEqual(err.code, pubrecReasonCode) + if (err instanceof ErrorWithReasonCode) { + assert.strictEqual( + err.code, + pubrecReasonCode, + ) + } else { + assert.instanceOf(err, ErrorWithReasonCode) + } } else { assert.ifError(err) } - setImmediate(() => { - client.end(true, () => { - server2.close(done) - }) - }) + done() }, ) }) @@ -1689,6 +1823,24 @@ export default function abstractTest(server, config, ports) { ) it('should keep message order', function _test(t, done) { + cleanBeforeMethod.add(async () => { + if (client) { + await new Promise((resolve) => { + client.end(true, () => { + resolve() + }) + }) + } + + if (server2) { + await new Promise((resolve) => { + server2.close(() => { + resolve() + }) + }) + } + }) + let publishCount = 0 let reconnect = false let client: mqtt.MqttClient @@ -1725,9 +1877,7 @@ export default function abstractTest(server, config, ports) { packet.payload.toString(), 'payload3', ) - client.end((err1) => { - server2.close((err2) => done(err1 || err2)) - }) + done() break } } @@ -3009,6 +3159,9 @@ export default function abstractTest(server, config, ports) { }) describe('auto reconnect', () => { + beforeEach(beforeEachExec) + after(beforeEachExec) + it('should mark the client disconnecting if #end called', function _test(t, done) { const client = connect() @@ -3532,8 +3685,27 @@ export default function abstractTest(server, config, ports) { }) it('should not resubscribe when reconnecting if suback is error', function _test(t, done) { + cleanBeforeMethod.add(async () => { + if (client) { + await new Promise((resolve) => { + client.end(true, () => { + resolve() + }) + }) + } + + if (server2) { + await new Promise((resolve) => { + server2.close(() => { + resolve() + }) + }) + } + }) + let tryReconnect = true let reconnectEvent = false + let client: mqtt.MqttClient | null = null const server2 = serverBuilder(config.protocol, (serverClient) => { serverClient.on('connect', (packet) => { const connack = @@ -3552,7 +3724,7 @@ export default function abstractTest(server, config, ports) { }) server2.listen(ports.PORTAND49, () => { - const client = connect({ + client = connect({ port: ports.PORTAND49, host: 'localhost', reconnectPeriod: 100, @@ -3580,17 +3752,33 @@ export default function abstractTest(server, config, ports) { Object.keys(client['_resubscribeTopics']).length, 0, ) - client.end(true, (err1) => { - server2.close((err2) => done(err1 || err2)) - }) + done() } }) }) }) it('should preserved incomingStore after disconnecting if clean is false', function _test(t, done) { + cleanBeforeMethod.add(async () => { + if (client) { + await new Promise((resolve) => { + client.end(true, () => { + resolve() + }) + }) + } + + if (server2) { + await new Promise((resolve) => { + server2.close(() => { + resolve() + }) + }) + } + }) + let reconnect = false - let client: mqtt.MqttClient + let client: mqtt.MqttClient | null = null const incomingStore = new mqtt.Store({ clean: false }) const outgoingStore = new mqtt.Store({ clean: false }) const server2 = serverBuilder(config.protocol, (serverClient) => { @@ -3624,9 +3812,7 @@ export default function abstractTest(server, config, ports) { }) }) serverClient.on('pubcomp', (packet) => { - client.end(true, (err1) => { - server2.close((err2) => done(err1 || err2)) - }) + done() }) }) @@ -3655,8 +3841,26 @@ export default function abstractTest(server, config, ports) { }) it('should clear outgoing if close from server', function _test(t, done) { + cleanBeforeMethod.add(async () => { + if (client) { + await new Promise((resolve) => { + client.end(true, () => { + resolve() + }) + }) + } + + if (server2) { + await new Promise((resolve) => { + server2.close(() => { + resolve() + }) + }) + } + }) + let reconnect = false - let client: mqtt.MqttClient + let client: mqtt.MqttClient | null = null const server2 = serverBuilder(config.protocol, (serverClient) => { serverClient.on('connect', (packet) => { const connack = @@ -3695,7 +3899,7 @@ export default function abstractTest(server, config, ports) { client.on('close', () => { if (reconnect) { - server2.close((err) => done(err)) + done() } else { assert.strictEqual( Object.keys(client.outgoing).length, @@ -3709,8 +3913,26 @@ export default function abstractTest(server, config, ports) { }) it('should resend in-flight QoS 1 publish messages from the client if clean is false', function _test(t, done) { + cleanBeforeMethod.add(async () => { + if (client) { + await new Promise((resolve) => { + client.end(true, () => { + resolve() + }) + }) + } + + if (server2) { + await new Promise((resolve) => { + server2.close(() => { + resolve() + }) + }) + } + }) + let reconnect = false - let client: mqtt.MqttClient + let client: mqtt.MqttClient | null = null const incomingStore = new mqtt.Store({ clean: false }) const outgoingStore = new mqtt.Store({ clean: false }) const server2 = serverBuilder(config.protocol, (serverClient) => { @@ -3721,9 +3943,7 @@ export default function abstractTest(server, config, ports) { }) serverClient.on('publish', (packet) => { if (reconnect) { - client.end(true, (err1) => { - server2.close((err2) => done(err1 || err2)) - }) + done() } else { client.end(true, () => { client.reconnect({ @@ -3757,8 +3977,26 @@ export default function abstractTest(server, config, ports) { }) it('should resend in-flight QoS 2 publish messages from the client if clean is false', function _test(t, done) { + cleanBeforeMethod.add(async () => { + if (client) { + await new Promise((resolve) => { + client.end(true, () => { + resolve() + }) + }) + } + + if (server2) { + await new Promise((resolve) => { + server2.close(() => { + resolve() + }) + }) + } + }) + let reconnect = false - let client: mqtt.MqttClient + let client: mqtt.MqttClient | null = null const incomingStore = new mqtt.Store({ clean: false }) const outgoingStore = new mqtt.Store({ clean: false }) const server2 = serverBuilder(config.protocol, (serverClient) => { @@ -3769,9 +4007,7 @@ export default function abstractTest(server, config, ports) { }) serverClient.on('publish', (packet) => { if (reconnect) { - client.end(true, (err1) => { - server2.close((err2) => done(err1 || err2)) - }) + done() } else { client.end(true, () => { client.reconnect({ @@ -3805,8 +4041,26 @@ export default function abstractTest(server, config, ports) { }) it('should resend in-flight QoS 2 pubrel messages from the client if clean is false', function _test(t, done) { + cleanBeforeMethod.add(async () => { + if (client) { + await new Promise((resolve) => { + client.end(true, () => { + resolve() + }) + }) + } + + if (server2) { + await new Promise((resolve) => { + server2.close(() => { + resolve() + }) + }) + } + }) + let reconnect = false - let client: mqtt.MqttClient + let client: mqtt.MqttClient | null = null const incomingStore = new mqtt.Store({ clean: false }) const outgoingStore = new mqtt.Store({ clean: false }) const server2 = serverBuilder(config.protocol, (serverClient) => { @@ -3855,9 +4109,7 @@ export default function abstractTest(server, config, ports) { (err) => { assert(reconnect) assert.ifError(err) - client.end(true, (err1) => { - server2.close((err2) => done(err1 || err2)) - }) + done() }, ) } @@ -3867,10 +4119,28 @@ export default function abstractTest(server, config, ports) { }) it('should resend in-flight publish messages by published order', function _test(t, done) { + cleanBeforeMethod.add(async () => { + if (client) { + await new Promise((resolve) => { + client.end(true, () => { + resolve() + }) + }) + } + + if (server2) { + await new Promise((resolve) => { + server2.close(() => { + resolve() + }) + }) + } + }) + let publishCount = 0 let reconnect = false let disconnectOnce = true - let client: mqtt.MqttClient + let client: mqtt.MqttClient | null = null const incomingStore = new mqtt.Store({ clean: false }) const outgoingStore = new mqtt.Store({ clean: false }) const server2 = serverBuilder(config.protocol, (serverClient) => { @@ -3904,9 +4174,7 @@ export default function abstractTest(server, config, ports) { packet.payload.toString(), 'payload3', ) - client.end(true, (err1) => { - server2.close((err2) => done(err1 || err2)) - }) + done() break } } else if (disconnectOnce) { From 19e6c3f9e744df33cdf57f02f1a662d0c212ff49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1ximo=20Liberata=20Torres?= Date: Fri, 26 Jul 2024 11:51:50 -0400 Subject: [PATCH 3/8] test: catch error --- test/abstract_client.ts | 195 ++++++++++++++++++++++------------------ 1 file changed, 110 insertions(+), 85 deletions(-) diff --git a/test/abstract_client.ts b/test/abstract_client.ts index 5f7e5ec18..66fc7888f 100644 --- a/test/abstract_client.ts +++ b/test/abstract_client.ts @@ -89,7 +89,8 @@ class CleanBeforeMethod { for (const result of results) { if (result.status === 'rejected') { - Promise.reject(result.reason) + if (result.reason instanceof Error) throw result.reason + else throw new Error(result.reason) } } } @@ -702,17 +703,19 @@ export default function abstractTest(server, config, ports) { it('should not interrupt messages', function _test(t, done) { cleanBeforeMethod.add(async () => { if (client) { - await new Promise((resolve) => { - client.end(true, () => { - resolve() + await new Promise((resolve, reject) => { + client.end(true, (err) => { + if (err) reject(err) + else resolve() }) }) } - if (server2) { - await new Promise((resolve) => { - server2.close(() => { - resolve() + if (server2?.listening) { + await new Promise((resolve, reject) => { + server2.close((err) => { + if (err) reject(err) + else resolve() }) }) } @@ -788,17 +791,19 @@ export default function abstractTest(server, config, ports) { it('should not overtake the messages stored in the level-db-store', function _test(t, done) { cleanBeforeMethod.add(async () => { if (client) { - await new Promise((resolve) => { - client.end(true, () => { - resolve() + await new Promise((resolve, reject) => { + client.end(true, (err) => { + if (err) reject(err) + else resolve() }) }) } - if (server2) { - await new Promise((resolve) => { - server2.close(() => { - resolve() + if (server2?.listening) { + await new Promise((resolve, reject) => { + server2.close((err) => { + if (err) reject(err) + else resolve() }) }) } @@ -1199,17 +1204,19 @@ export default function abstractTest(server, config, ports) { it('should fire a callback (qos 1) on error', function _test(t, done) { cleanBeforeMethod.add(async () => { if (client) { - await new Promise((resolve) => { - client.end(true, () => { - resolve() + await new Promise((resolve, reject) => { + client.end(true, (err) => { + if (err) reject(err) + else resolve() }) }) } - if (server2) { - await new Promise((resolve) => { - server2.close(() => { - resolve() + if (server2?.listening) { + await new Promise((resolve, reject) => { + server2.close((err) => { + if (err) reject(err) + else resolve() }) }) } @@ -1290,17 +1297,19 @@ export default function abstractTest(server, config, ports) { it('should fire a callback (qos 2) on error', function _test(t, done) { cleanBeforeMethod.add(async () => { if (client) { - await new Promise((resolve) => { - client.end(true, () => { - resolve() + await new Promise((resolve, reject) => { + client.end(true, (err) => { + if (err) reject(err) + else resolve() }) }) } - if (server2) { - await new Promise((resolve) => { - server2.close(() => { - resolve() + if (server2?.listening) { + await new Promise((resolve, reject) => { + server2.close((err) => { + if (err) reject(err) + else resolve() }) }) } @@ -1825,17 +1834,19 @@ export default function abstractTest(server, config, ports) { it('should keep message order', function _test(t, done) { cleanBeforeMethod.add(async () => { if (client) { - await new Promise((resolve) => { - client.end(true, () => { - resolve() + await new Promise((resolve, reject) => { + client.end(true, (err) => { + if (err) reject(err) + else resolve() }) }) } - if (server2) { - await new Promise((resolve) => { - server2.close(() => { - resolve() + if (server2?.listening) { + await new Promise((resolve, reject) => { + server2.close((err) => { + if (err) reject(err) + else resolve() }) }) } @@ -3687,17 +3698,19 @@ export default function abstractTest(server, config, ports) { it('should not resubscribe when reconnecting if suback is error', function _test(t, done) { cleanBeforeMethod.add(async () => { if (client) { - await new Promise((resolve) => { - client.end(true, () => { - resolve() + await new Promise((resolve, reject) => { + client.end(true, (err) => { + if (err) reject(err) + else resolve() }) }) } - if (server2) { - await new Promise((resolve) => { - server2.close(() => { - resolve() + if (server2?.listening) { + await new Promise((resolve, reject) => { + server2.close((err) => { + if (err) reject(err) + else resolve() }) }) } @@ -3761,17 +3774,19 @@ export default function abstractTest(server, config, ports) { it('should preserved incomingStore after disconnecting if clean is false', function _test(t, done) { cleanBeforeMethod.add(async () => { if (client) { - await new Promise((resolve) => { - client.end(true, () => { - resolve() + await new Promise((resolve, reject) => { + client.end(true, (err) => { + if (err) reject(err) + else resolve() }) }) } - if (server2) { - await new Promise((resolve) => { - server2.close(() => { - resolve() + if (server2?.listening) { + await new Promise((resolve, reject) => { + server2.close((err) => { + if (err) reject(err) + else resolve() }) }) } @@ -3843,17 +3858,19 @@ export default function abstractTest(server, config, ports) { it('should clear outgoing if close from server', function _test(t, done) { cleanBeforeMethod.add(async () => { if (client) { - await new Promise((resolve) => { - client.end(true, () => { - resolve() + await new Promise((resolve, reject) => { + client.end(true, (err) => { + if (err) reject(err) + else resolve() }) }) } - if (server2) { - await new Promise((resolve) => { - server2.close(() => { - resolve() + if (server2?.listening) { + await new Promise((resolve, reject) => { + server2.close((err) => { + if (err) reject(err) + else resolve() }) }) } @@ -3915,17 +3932,19 @@ export default function abstractTest(server, config, ports) { it('should resend in-flight QoS 1 publish messages from the client if clean is false', function _test(t, done) { cleanBeforeMethod.add(async () => { if (client) { - await new Promise((resolve) => { - client.end(true, () => { - resolve() + await new Promise((resolve, reject) => { + client.end(true, (err) => { + if (err) reject(err) + else resolve() }) }) } - if (server2) { - await new Promise((resolve) => { - server2.close(() => { - resolve() + if (server2?.listening) { + await new Promise((resolve, reject) => { + server2.close((err) => { + if (err) reject(err) + else resolve() }) }) } @@ -3979,17 +3998,19 @@ export default function abstractTest(server, config, ports) { it('should resend in-flight QoS 2 publish messages from the client if clean is false', function _test(t, done) { cleanBeforeMethod.add(async () => { if (client) { - await new Promise((resolve) => { - client.end(true, () => { - resolve() + await new Promise((resolve, reject) => { + client.end(true, (err) => { + if (err) reject(err) + else resolve() }) }) } - if (server2) { - await new Promise((resolve) => { - server2.close(() => { - resolve() + if (server2?.listening) { + await new Promise((resolve, reject) => { + server2.close((err) => { + if (err) reject(err) + else resolve() }) }) } @@ -4043,17 +4064,19 @@ export default function abstractTest(server, config, ports) { it('should resend in-flight QoS 2 pubrel messages from the client if clean is false', function _test(t, done) { cleanBeforeMethod.add(async () => { if (client) { - await new Promise((resolve) => { - client.end(true, () => { - resolve() + await new Promise((resolve, reject) => { + client.end(true, (err) => { + if (err) reject(err) + else resolve() }) }) } - if (server2) { - await new Promise((resolve) => { - server2.close(() => { - resolve() + if (server2?.listening) { + await new Promise((resolve, reject) => { + server2.close((err) => { + if (err) reject(err) + else resolve() }) }) } @@ -4121,17 +4144,19 @@ export default function abstractTest(server, config, ports) { it('should resend in-flight publish messages by published order', function _test(t, done) { cleanBeforeMethod.add(async () => { if (client) { - await new Promise((resolve) => { - client.end(true, () => { - resolve() + await new Promise((resolve, reject) => { + client.end(true, (err) => { + if (err) reject(err) + else resolve() }) }) } - if (server2) { - await new Promise((resolve) => { - server2.close(() => { - resolve() + if (server2?.listening) { + await new Promise((resolve, reject) => { + server2.close((err) => { + if (err) reject(err) + else resolve() }) }) } From 25da706d9e0c546e1e3c47c2710f16778b66acf6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1ximo=20Liberata=20Torres?= Date: Wed, 31 Jul 2024 11:10:47 -0400 Subject: [PATCH 4/8] feat: move clean_method class to help --- test/abstract_client.ts | 367 +++++++---------------------------- test/helpers/clean_method.ts | 311 +++++++++++++++++++++++++++++ 2 files changed, 380 insertions(+), 298 deletions(-) create mode 100644 test/helpers/clean_method.ts diff --git a/test/abstract_client.ts b/test/abstract_client.ts index 66fc7888f..c5d6f911b 100644 --- a/test/abstract_client.ts +++ b/test/abstract_client.ts @@ -8,6 +8,7 @@ import levelStore from 'mqtt-level-store' import Store from '../src/lib/store' import serverBuilder from './server_helpers_for_client_tests' import handlePubrel from '../src/lib/handlers/pubrel' +import CleanMethod from './helpers/clean_method' import handle from '../src/lib/handlers/index' import handlePublish from '../src/lib/handlers/publish' import mqtt, { @@ -47,55 +48,6 @@ const fakeTimersOptions = { shouldClearNativeTimers: true, } -class CleanBeforeMethod { - #methods: { - method: Promise | ((...args: any[]) => Promise) - args: any[] - }[] - - constructor() { - this.#methods = [] - } - - add Promise>( - method: Promise | T, - ...args: Parameters - ) { - this.#methods.push({ method, args }) - } - - async executeAll(options?: { - /** - * If `true`, all methods will be removed after execution. - */ - removeAll?: boolean - }) { - if (this.#methods.length === 0) { - return - } - - const results = await Promise.allSettled( - this.#methods.map(({ method, args }) => { - if (method instanceof Promise) { - return method - } - return method(...args) - }), - ) - - if (options?.removeAll) { - this.#methods = [] - } - - for (const result of results) { - if (result.status === 'rejected') { - if (result.reason instanceof Error) throw result.reason - else throw new Error(result.reason) - } - } - } -} - export default function abstractTest(server, config, ports) { const version = config.protocolVersion || 4 @@ -107,9 +59,20 @@ export default function abstractTest(server, config, ports) { return mqtt.connect(opts) } - const cleanBeforeMethod = new CleanBeforeMethod() - const beforeEachExec = () => - cleanBeforeMethod.executeAll({ removeAll: true }) + const cleanMethod = new CleanMethod() + + async function beforeEachExec() { + await cleanMethod.closeClientAndServer() + await cleanMethod.executeAllMethods() + } + + async function afterExec() { + await cleanMethod.closeClientAndServer() + await cleanMethod.executeAllMethods() + cleanMethod.reset() + } + + after(afterExec) describe('closing', () => { it('should emit close if stream closes', function _test(t, done) { @@ -655,7 +618,7 @@ export default function abstractTest(server, config, ports) { describe('offline messages', () => { beforeEach(beforeEachExec) - after(beforeEachExec) + after(afterExec) it('should queue message until connected', function _test(t, done) { const client = connect() @@ -701,26 +664,6 @@ export default function abstractTest(server, config, ports) { }) it('should not interrupt messages', function _test(t, done) { - cleanBeforeMethod.add(async () => { - if (client) { - await new Promise((resolve, reject) => { - client.end(true, (err) => { - if (err) reject(err) - else resolve() - }) - }) - } - - if (server2?.listening) { - await new Promise((resolve, reject) => { - server2.close((err) => { - if (err) reject(err) - else resolve() - }) - }) - } - }) - let client: mqtt.MqttClient | null = null let publishCount = 0 const incomingStore = new mqtt.Store({ clean: false }) @@ -764,6 +707,8 @@ export default function abstractTest(server, config, ports) { }) }) + cleanMethod.setServer(server2) + server2.listen(ports.PORTAND50, () => { client = connect({ port: ports.PORTAND50, @@ -785,29 +730,12 @@ export default function abstractTest(server, config, ports) { }) client.publish('test', 'payload1', { qos: 2 }) client.publish('test', 'payload2', { qos: 2 }) + cleanMethod.setClient(client) }) }) it('should not overtake the messages stored in the level-db-store', function _test(t, done) { - cleanBeforeMethod.add(async () => { - if (client) { - await new Promise((resolve, reject) => { - client.end(true, (err) => { - if (err) reject(err) - else resolve() - }) - }) - } - - if (server2?.listening) { - await new Promise((resolve, reject) => { - server2.close((err) => { - if (err) reject(err) - else resolve() - }) - }) - } - + cleanMethod.add({ executeOnce: true }, async () => { await new Promise((resolve) => { fs.rm(storePath, { recursive: true }, () => { resolve() @@ -821,7 +749,6 @@ export default function abstractTest(server, config, ports) { const incomingStore = store.incoming const outgoingStore = store.outgoing let publishCount = 0 - const server2 = serverBuilder(config.protocol, (serverClient) => { serverClient.on('connect', () => { const connack = @@ -857,6 +784,8 @@ export default function abstractTest(server, config, ports) { }) }) + cleanMethod.setServer(server2) + const clientOptions = { port: ports.PORTAND72, host: 'localhost', @@ -873,8 +802,9 @@ export default function abstractTest(server, config, ports) { client.once('close', () => { client.once('connect', () => { - client.publish('test', 'payload2', { qos: 1 }) - client.publish('test', 'payload3', { qos: 1 }) + client.publish('test', 'payload2', { qos: 1 }, () => { + client.publish('test', 'payload3', { qos: 1 }) + }) }) // reconecting client.reconnect(clientOptions) @@ -889,6 +819,8 @@ export default function abstractTest(server, config, ports) { }, }) }) + + cleanMethod.setClient(client) }) }) @@ -978,7 +910,7 @@ export default function abstractTest(server, config, ports) { describe('publishing', () => { beforeEach(beforeEachExec) - after(beforeEachExec) + after(afterExec) it('should publish a message (offline)', function _test(t, done) { const client = connect() @@ -1202,26 +1134,6 @@ export default function abstractTest(server, config, ports) { }) it('should fire a callback (qos 1) on error', function _test(t, done) { - cleanBeforeMethod.add(async () => { - if (client) { - await new Promise((resolve, reject) => { - client.end(true, (err) => { - if (err) reject(err) - else resolve() - }) - }) - } - - if (server2?.listening) { - await new Promise((resolve, reject) => { - server2.close((err) => { - if (err) reject(err) - else resolve() - }) - }) - } - }) - // 145 = Packet Identifier in use const pubackReasonCode = 145 const pubOpts: IClientPublishOptions = { qos: 1 } @@ -1247,6 +1159,8 @@ export default function abstractTest(server, config, ports) { }) }) + cleanMethod.setServer(server2) + server2.listen(ports.PORTAND72, () => { client = connect({ port: ports.PORTAND72, @@ -1279,6 +1193,8 @@ export default function abstractTest(server, config, ports) { }, ) }) + + cleanMethod.setClient(client) }) }) @@ -1295,31 +1211,10 @@ export default function abstractTest(server, config, ports) { }) it('should fire a callback (qos 2) on error', function _test(t, done) { - cleanBeforeMethod.add(async () => { - if (client) { - await new Promise((resolve, reject) => { - client.end(true, (err) => { - if (err) reject(err) - else resolve() - }) - }) - } - - if (server2?.listening) { - await new Promise((resolve, reject) => { - server2.close((err) => { - if (err) reject(err) - else resolve() - }) - }) - } - }) - // 145 = Packet Identifier in use const pubrecReasonCode = 145 const pubOpts: IClientPublishOptions = { qos: 2 } let client: mqtt.MqttClient | null = null - const server2 = serverBuilder(config.protocol, (serverClient) => { serverClient.on('connect', () => { const connack = @@ -1344,6 +1239,8 @@ export default function abstractTest(server, config, ports) { }) }) + cleanMethod.setServer(server2) + server2.listen(ports.PORTAND103, () => { client = connect({ port: ports.PORTAND103, @@ -1376,6 +1273,8 @@ export default function abstractTest(server, config, ports) { }, ) }) + + cleanMethod.setClient(client) }) }) @@ -1832,26 +1731,6 @@ export default function abstractTest(server, config, ports) { ) it('should keep message order', function _test(t, done) { - cleanBeforeMethod.add(async () => { - if (client) { - await new Promise((resolve, reject) => { - client.end(true, (err) => { - if (err) reject(err) - else resolve() - }) - }) - } - - if (server2?.listening) { - await new Promise((resolve, reject) => { - server2.close((err) => { - if (err) reject(err) - else resolve() - }) - }) - } - }) - let publishCount = 0 let reconnect = false let client: mqtt.MqttClient @@ -1895,6 +1774,8 @@ export default function abstractTest(server, config, ports) { }) }) + cleanMethod.setServer(server2) + server2.listen(ports.PORTAND50, () => { client = connect({ port: ports.PORTAND50, @@ -1925,6 +1806,8 @@ export default function abstractTest(server, config, ports) { reconnect = true } }) + + cleanMethod.setClient(client) }) }) @@ -3171,7 +3054,7 @@ export default function abstractTest(server, config, ports) { describe('auto reconnect', () => { beforeEach(beforeEachExec) - after(beforeEachExec) + after(afterExec) it('should mark the client disconnecting if #end called', function _test(t, done) { const client = connect() @@ -3696,26 +3579,6 @@ export default function abstractTest(server, config, ports) { }) it('should not resubscribe when reconnecting if suback is error', function _test(t, done) { - cleanBeforeMethod.add(async () => { - if (client) { - await new Promise((resolve, reject) => { - client.end(true, (err) => { - if (err) reject(err) - else resolve() - }) - }) - } - - if (server2?.listening) { - await new Promise((resolve, reject) => { - server2.close((err) => { - if (err) reject(err) - else resolve() - }) - }) - } - }) - let tryReconnect = true let reconnectEvent = false let client: mqtt.MqttClient | null = null @@ -3736,6 +3599,8 @@ export default function abstractTest(server, config, ports) { }) }) + cleanMethod.setServer(server2) + server2.listen(ports.PORTAND49, () => { client = connect({ port: ports.PORTAND49, @@ -3768,30 +3633,12 @@ export default function abstractTest(server, config, ports) { done() } }) + + cleanMethod.setClient(client) }) }) it('should preserved incomingStore after disconnecting if clean is false', function _test(t, done) { - cleanBeforeMethod.add(async () => { - if (client) { - await new Promise((resolve, reject) => { - client.end(true, (err) => { - if (err) reject(err) - else resolve() - }) - }) - } - - if (server2?.listening) { - await new Promise((resolve, reject) => { - server2.close((err) => { - if (err) reject(err) - else resolve() - }) - }) - } - }) - let reconnect = false let client: mqtt.MqttClient | null = null const incomingStore = new mqtt.Store({ clean: false }) @@ -3831,6 +3678,8 @@ export default function abstractTest(server, config, ports) { }) }) + cleanMethod.setServer(server2) + server2.listen(ports.PORTAND50, () => { client = connect({ port: ports.PORTAND50, @@ -3852,30 +3701,12 @@ export default function abstractTest(server, config, ports) { assert.strictEqual(topic, 'topic') assert.strictEqual(message.toString(), 'payload') }) + + cleanMethod.setClient(client) }) }) it('should clear outgoing if close from server', function _test(t, done) { - cleanBeforeMethod.add(async () => { - if (client) { - await new Promise((resolve, reject) => { - client.end(true, (err) => { - if (err) reject(err) - else resolve() - }) - }) - } - - if (server2?.listening) { - await new Promise((resolve, reject) => { - server2.close((err) => { - if (err) reject(err) - else resolve() - }) - }) - } - }) - let reconnect = false let client: mqtt.MqttClient | null = null const server2 = serverBuilder(config.protocol, (serverClient) => { @@ -3896,6 +3727,8 @@ export default function abstractTest(server, config, ports) { }) }) + cleanMethod.setServer(server2) + server2.listen(ports.PORTAND50, () => { client = connect({ port: ports.PORTAND50, @@ -3926,30 +3759,12 @@ export default function abstractTest(server, config, ports) { client.reconnect() } }) + + cleanMethod.setClient(client) }) }) it('should resend in-flight QoS 1 publish messages from the client if clean is false', function _test(t, done) { - cleanBeforeMethod.add(async () => { - if (client) { - await new Promise((resolve, reject) => { - client.end(true, (err) => { - if (err) reject(err) - else resolve() - }) - }) - } - - if (server2?.listening) { - await new Promise((resolve, reject) => { - server2.close((err) => { - if (err) reject(err) - else resolve() - }) - }) - } - }) - let reconnect = false let client: mqtt.MqttClient | null = null const incomingStore = new mqtt.Store({ clean: false }) @@ -3975,6 +3790,8 @@ export default function abstractTest(server, config, ports) { }) }) + cleanMethod.setServer(server2) + server2.listen(ports.PORTAND50, () => { client = connect({ port: ports.PORTAND50, @@ -3992,30 +3809,12 @@ export default function abstractTest(server, config, ports) { } }) client.on('error', () => {}) + + cleanMethod.setClient(client) }) }) it('should resend in-flight QoS 2 publish messages from the client if clean is false', function _test(t, done) { - cleanBeforeMethod.add(async () => { - if (client) { - await new Promise((resolve, reject) => { - client.end(true, (err) => { - if (err) reject(err) - else resolve() - }) - }) - } - - if (server2?.listening) { - await new Promise((resolve, reject) => { - server2.close((err) => { - if (err) reject(err) - else resolve() - }) - }) - } - }) - let reconnect = false let client: mqtt.MqttClient | null = null const incomingStore = new mqtt.Store({ clean: false }) @@ -4041,6 +3840,8 @@ export default function abstractTest(server, config, ports) { }) }) + cleanMethod.setServer(server2) + server2.listen(ports.PORTAND50, () => { client = connect({ port: ports.PORTAND50, @@ -4058,30 +3859,12 @@ export default function abstractTest(server, config, ports) { } }) client.on('error', () => {}) + + cleanMethod.setClient(client) }) }) it('should resend in-flight QoS 2 pubrel messages from the client if clean is false', function _test(t, done) { - cleanBeforeMethod.add(async () => { - if (client) { - await new Promise((resolve, reject) => { - client.end(true, (err) => { - if (err) reject(err) - else resolve() - }) - }) - } - - if (server2?.listening) { - await new Promise((resolve, reject) => { - server2.close((err) => { - if (err) reject(err) - else resolve() - }) - }) - } - }) - let reconnect = false let client: mqtt.MqttClient | null = null const incomingStore = new mqtt.Store({ clean: false }) @@ -4112,6 +3895,8 @@ export default function abstractTest(server, config, ports) { }) }) + cleanMethod.setServer(server2) + server2.listen(ports.PORTAND50, () => { client = connect({ port: ports.PORTAND50, @@ -4138,30 +3923,12 @@ export default function abstractTest(server, config, ports) { } }) client.on('error', () => {}) + + cleanMethod.setClient(client) }) }) it('should resend in-flight publish messages by published order', function _test(t, done) { - cleanBeforeMethod.add(async () => { - if (client) { - await new Promise((resolve, reject) => { - client.end(true, (err) => { - if (err) reject(err) - else resolve() - }) - }) - } - - if (server2?.listening) { - await new Promise((resolve, reject) => { - server2.close((err) => { - if (err) reject(err) - else resolve() - }) - }) - } - }) - let publishCount = 0 let reconnect = false let disconnectOnce = true @@ -4215,6 +3982,8 @@ export default function abstractTest(server, config, ports) { }) }) + cleanMethod.setServer(server2) + server2.listen(ports.PORTAND50, () => { client = connect({ port: ports.PORTAND50, @@ -4236,6 +4005,8 @@ export default function abstractTest(server, config, ports) { } }) client.on('error', () => {}) + + cleanMethod.setClient(client) }) }) diff --git a/test/helpers/clean_method.ts b/test/helpers/clean_method.ts new file mode 100644 index 000000000..103bf4950 --- /dev/null +++ b/test/helpers/clean_method.ts @@ -0,0 +1,311 @@ +import type { MqttClient } from 'src' +import { randomUUID } from 'node:crypto' +import serverBuilder from '../server_helpers_for_client_tests' + +type ServerBuilderInstance = ReturnType + +type AddOptions = { + /** + * @description + * If `true`, the method will be executed only one time and then removed from the store. + * + * @default true + */ + executeOnce?: boolean +} + +type ResetMethodOptions = { + /** + * @description + * If `true`, only the methods that have the option `executeOnce` set to `true` will be removed. + * + * @default false + */ + removeOnce?: boolean +} + +type ResetOptions = { + method?: ResetMethodOptions +} + +/** + * @description + * Class to help clean the environment or close opened connections after tests finish. + * Also, you can add custom methods to be executed after the tests finish, like + * deleting temporary files or closing connections. + * + * @example + * ``` + * import { describe, it } from 'node:test' + * import mqtt from './src' + * import serverBuilder from './test/server_helpers_for_client_tests' + * import CleanMethod from './test/helpers/clean_method' + * + * + * describe('Test', () => { + * const cleanMethod = new CleanMethod() + * + * it('should clean the client and server', (t, done) => { + * t.after(async () => { + * await cleanMethod.closeClientAndServer() + * }) + * + * const server = serverBuilder('8883') + * const client = mqtt.connect('mqtt://localhost') + * + * cleanMethod.setServer(server) + * cleanMethod.setClient(client) + * }) + * }) + * ``` + * + * @example + * ``` + * import { describe, it, after } from 'node:test' + * import mqtt from './src' + * import serverBuilder from './test/server_helpers_for_client_tests' + * import CleanMethod from './test/helpers/clean_method' + * + * + * describe('Test', () => { + * + * const cleanMethod = new CleanMethod() + * let server = serverBuilder('8883') + * + * cleanMethod.add({}, async () => { + * if (server?.listening) { + * await new Promise((resolve, reject) => { + * server.close((err) => { + * if (err) reject(err) + * else resolve() + * }) + * }) + * } + * }) + * + * after(async () => { + * await cleanMethod.closeAllMethods() + * }) + * + * it('should clean the client and server', (t, done) => { + * server = serverBuilder('8883') + * const client = mqtt.connect('mqtt://localhost') + * + * cleanMethod.setClient(client) + * done() + * }) + * + * }) + * ``` + */ +class CleanMethod { + #client: MqttClient | null + + #server: ServerBuilderInstance | null + + #methods: Map< + string, + { + options: AddOptions + method: Promise | ((...args: any[]) => Promise) + args: any[] + } + > + + constructor() { + this.#client = null + this.#server = null + this.#methods = new Map() + } + + /** + * @description + * Set the client to be used as default to close. + */ + setClient(client: MqttClient | null) { + this.#client = client + } + + /** + * @description + * Set the server to be used as default to close. + */ + setServer(server: ServerBuilderInstance | null) { + this.#server = server + } + + /** + * @param options Options to be passed to the method. + * @param method It can be a promise or a function that returns a promise. + * @param args Arguments to be passed to the method. + * + * @description + * Add a method to be executed + */ + add Promise>( + options: AddOptions | undefined, + method: Promise | T, + ...args: Parameters + ): string { + const id = randomUUID() + + this.#methods.set(id, { + method, + args, + options: { executeOnce: true, ...options }, + }) + + return id + } + + /** + * + * @description + * Restart the class to its initial state. + * Set the client and server to `null` and remove all methods stored. + */ + reset(options?: ResetOptions) { + this.#client = null + this.#server = null + + this.resetMethods(options?.method) + } + + /** + * @description + * Remove all methods stored. + */ + resetMethods(options?: ResetMethodOptions) { + if (options?.removeOnce) { + for (const [id, { options: methodOptions }] of this.#methods) { + if (methodOptions.executeOnce) { + this.#methods.delete(id) + } + } + } else { + this.#methods.clear() + } + } + + /** + * @description + * Close the `client` connection. + * + * @default + * Use the `client` set in the class. + */ + async closeClient(client?: MqttClient | null) { + const clientToClean = client ?? this.#client + + if (clientToClean) { + await new Promise((resolve, reject) => { + clientToClean.end(true, (err) => { + if (err) reject(err) + else resolve() + }) + }) + } + } + + /** + * @description + * Close the `server` connection. + * + * @default + * Use the `server` set in the class. + */ + async closeServer(server?: ServerBuilderInstance | null) { + const serverToClean = server ?? this.#server + + if (serverToClean?.listening) { + await new Promise((resolve, reject) => { + serverToClean.close((err) => { + if (err) reject(err) + else resolve() + }) + }) + } + } + + /** + * @description + * Close the `client` and `server` connections. + * + * @default + * Use the `client` and `server` set in the class. + */ + async closeClientAndServer(options?: { + client?: MqttClient | null + server?: ServerBuilderInstance | null + }) { + await this.closeClient(options?.client) + await this.closeServer(options?.server) + } + + /** + * @param id Method id to be executed. + * + * @description + * Execute a method stored by its id + * If the method has the option `executeOnce` set to `true`, it will be removed after execution. + */ + async executeMethod(id: string) { + const method = this.#methods.get(id) + + if (!method) { + return + } + + if (method.options.executeOnce) { + this.#methods.delete(id) + } + + if (method.method instanceof Promise) { + await method.method + } else { + await method.method(...method.args) + } + } + + /** + * @description + * Execute all methods stored. + * If a method has the option `executeOnce` set to `true`, it will be removed after execution. + */ + async executeAllMethods() { + if (this.#methods.size === 0) { + return + } + + const methods: Array> = [] + + for (const [id, { method, options, args }] of this.#methods) { + if (method instanceof Promise) { + methods.push(method) + } else { + const promise = new Promise((resolve, reject) => { + method(...args) + .then(resolve) + .catch(reject) + }) + + methods.push(promise) + } + + if (options.executeOnce) { + this.#methods.delete(id) + } + } + + const results = await Promise.allSettled(methods) + + for (const result of results) { + if (result.status === 'rejected') { + if (result.reason instanceof Error) throw result.reason + else throw new Error(result.reason) + } + } + } +} + +export default CleanMethod From 54a56ac74f176dafd4db841c435eadfcdc940f58 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1ximo=20Liberata=20Torres?= Date: Wed, 31 Jul 2024 11:54:20 -0400 Subject: [PATCH 5/8] fix: hang tests --- test/abstract_client.ts | 46 +++++++++++++++++++++-------------------- 1 file changed, 24 insertions(+), 22 deletions(-) diff --git a/test/abstract_client.ts b/test/abstract_client.ts index c5d6f911b..958a2a2b5 100644 --- a/test/abstract_client.ts +++ b/test/abstract_client.ts @@ -64,6 +64,7 @@ export default function abstractTest(server, config, ports) { async function beforeEachExec() { await cleanMethod.closeClientAndServer() await cleanMethod.executeAllMethods() + cleanMethod.reset({ method: { removeOnce: true } }) } async function afterExec() { @@ -720,6 +721,7 @@ export default function abstractTest(server, config, ports) { outgoingStore, queueQoSZero: true, }) + cleanMethod.setClient(client) client.on('packetreceive', (packet) => { if (packet.cmd === 'connack') { setImmediate(() => { @@ -730,7 +732,6 @@ export default function abstractTest(server, config, ports) { }) client.publish('test', 'payload1', { qos: 2 }) client.publish('test', 'payload2', { qos: 2 }) - cleanMethod.setClient(client) }) }) @@ -800,6 +801,8 @@ export default function abstractTest(server, config, ports) { server2.listen(ports.PORTAND72, () => { client = connect(clientOptions) + cleanMethod.setClient(client) + client.once('close', () => { client.once('connect', () => { client.publish('test', 'payload2', { qos: 1 }, () => { @@ -820,7 +823,6 @@ export default function abstractTest(server, config, ports) { }) }) - cleanMethod.setClient(client) }) }) @@ -1170,6 +1172,8 @@ export default function abstractTest(server, config, ports) { reconnectPeriod: 0, }) + cleanMethod.setClient(client) + client.once('connect', () => { client.publish( 'a', @@ -1193,8 +1197,6 @@ export default function abstractTest(server, config, ports) { }, ) }) - - cleanMethod.setClient(client) }) }) @@ -1250,6 +1252,8 @@ export default function abstractTest(server, config, ports) { reconnectPeriod: 0, }) + cleanMethod.setClient(client) + client.once('connect', () => { client.publish( 'a', @@ -1273,8 +1277,6 @@ export default function abstractTest(server, config, ports) { }, ) }) - - cleanMethod.setClient(client) }) }) @@ -1787,6 +1789,8 @@ export default function abstractTest(server, config, ports) { outgoingStore, }) + cleanMethod.setClient(client) + client.on('connect', () => { if (!reconnect) { client.publish('topic', 'payload1', { qos: 1 }) @@ -1806,8 +1810,6 @@ export default function abstractTest(server, config, ports) { reconnect = true } }) - - cleanMethod.setClient(client) }) }) @@ -3608,6 +3610,8 @@ export default function abstractTest(server, config, ports) { reconnectPeriod: 100, }) + cleanMethod.setClient(client) + client.on('reconnect', () => { reconnectEvent = true }) @@ -3633,8 +3637,6 @@ export default function abstractTest(server, config, ports) { done() } }) - - cleanMethod.setClient(client) }) }) @@ -3691,6 +3693,8 @@ export default function abstractTest(server, config, ports) { outgoingStore, }) + cleanMethod.setClient(client) + client.on('connect', () => { if (!reconnect) { client.subscribe('test', { qos: 2 }, () => {}) @@ -3701,8 +3705,6 @@ export default function abstractTest(server, config, ports) { assert.strictEqual(topic, 'topic') assert.strictEqual(message.toString(), 'payload') }) - - cleanMethod.setClient(client) }) }) @@ -3739,6 +3741,8 @@ export default function abstractTest(server, config, ports) { reconnectPeriod: 0, }) + cleanMethod.setClient(client) + client.on('connect', () => { client.subscribe('test', { qos: 2 }, (e) => { if (!e) { @@ -3759,8 +3763,6 @@ export default function abstractTest(server, config, ports) { client.reconnect() } }) - - cleanMethod.setClient(client) }) }) @@ -3803,14 +3805,14 @@ export default function abstractTest(server, config, ports) { outgoingStore, }) + cleanMethod.setClient(client) + client.on('connect', () => { if (!reconnect) { client.publish('topic', 'payload', { qos: 1 }) } }) client.on('error', () => {}) - - cleanMethod.setClient(client) }) }) @@ -3853,14 +3855,14 @@ export default function abstractTest(server, config, ports) { outgoingStore, }) + cleanMethod.setClient(client) + client.on('connect', () => { if (!reconnect) { client.publish('topic', 'payload', { qos: 2 }) } }) client.on('error', () => {}) - - cleanMethod.setClient(client) }) }) @@ -3908,6 +3910,8 @@ export default function abstractTest(server, config, ports) { outgoingStore, }) + cleanMethod.setClient(client) + client.on('connect', () => { if (!reconnect) { client.publish( @@ -3923,8 +3927,6 @@ export default function abstractTest(server, config, ports) { } }) client.on('error', () => {}) - - cleanMethod.setClient(client) }) }) @@ -3995,6 +3997,8 @@ export default function abstractTest(server, config, ports) { outgoingStore, }) + cleanMethod.setClient(client) + client['nextId'] = 65535 client.on('connect', () => { @@ -4005,8 +4009,6 @@ export default function abstractTest(server, config, ports) { } }) client.on('error', () => {}) - - cleanMethod.setClient(client) }) }) From 72add25efe8e3f36b396eedfa1784cd22aedbf7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1ximo=20Liberata=20Torres?= Date: Wed, 31 Jul 2024 11:57:04 -0400 Subject: [PATCH 6/8] fix: lint style --- test/abstract_client.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/test/abstract_client.ts b/test/abstract_client.ts index 958a2a2b5..9a33d8d94 100644 --- a/test/abstract_client.ts +++ b/test/abstract_client.ts @@ -822,7 +822,6 @@ export default function abstractTest(server, config, ports) { }, }) }) - }) }) From 0290cdae55ffab3d6b681f9accb185b3881bc650 Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Thu, 1 Aug 2024 08:52:25 +0200 Subject: [PATCH 7/8] fix: teardown helper functionality --- test/abstract_client.ts | 64 ++++++++--------- .../{clean_method.ts => TeardownHelper.ts} | 72 +++++-------------- 2 files changed, 48 insertions(+), 88 deletions(-) rename test/helpers/{clean_method.ts => TeardownHelper.ts} (78%) diff --git a/test/abstract_client.ts b/test/abstract_client.ts index 9a33d8d94..d4441318a 100644 --- a/test/abstract_client.ts +++ b/test/abstract_client.ts @@ -8,7 +8,7 @@ import levelStore from 'mqtt-level-store' import Store from '../src/lib/store' import serverBuilder from './server_helpers_for_client_tests' import handlePubrel from '../src/lib/handlers/pubrel' -import CleanMethod from './helpers/clean_method' +import TeardownHelper from './helpers/TeardownHelper' import handle from '../src/lib/handlers/index' import handlePublish from '../src/lib/handlers/publish' import mqtt, { @@ -59,18 +59,16 @@ export default function abstractTest(server, config, ports) { return mqtt.connect(opts) } - const cleanMethod = new CleanMethod() + const teardownHelper = new TeardownHelper() async function beforeEachExec() { - await cleanMethod.closeClientAndServer() - await cleanMethod.executeAllMethods() - cleanMethod.reset({ method: { removeOnce: true } }) + await teardownHelper.runAll() + teardownHelper.reset({ removeOnce: true }) } async function afterExec() { - await cleanMethod.closeClientAndServer() - await cleanMethod.executeAllMethods() - cleanMethod.reset() + await teardownHelper.runAll() + teardownHelper.reset() } after(afterExec) @@ -708,7 +706,7 @@ export default function abstractTest(server, config, ports) { }) }) - cleanMethod.setServer(server2) + teardownHelper.addServer(server2) server2.listen(ports.PORTAND50, () => { client = connect({ @@ -721,7 +719,7 @@ export default function abstractTest(server, config, ports) { outgoingStore, queueQoSZero: true, }) - cleanMethod.setClient(client) + teardownHelper.addClient(client) client.on('packetreceive', (packet) => { if (packet.cmd === 'connack') { setImmediate(() => { @@ -736,7 +734,7 @@ export default function abstractTest(server, config, ports) { }) it('should not overtake the messages stored in the level-db-store', function _test(t, done) { - cleanMethod.add({ executeOnce: true }, async () => { + teardownHelper.add({ executeOnce: true }, async () => { await new Promise((resolve) => { fs.rm(storePath, { recursive: true }, () => { resolve() @@ -785,7 +783,7 @@ export default function abstractTest(server, config, ports) { }) }) - cleanMethod.setServer(server2) + teardownHelper.addServer(server2) const clientOptions = { port: ports.PORTAND72, @@ -801,7 +799,7 @@ export default function abstractTest(server, config, ports) { server2.listen(ports.PORTAND72, () => { client = connect(clientOptions) - cleanMethod.setClient(client) + teardownHelper.addClient(client) client.once('close', () => { client.once('connect', () => { @@ -1160,7 +1158,7 @@ export default function abstractTest(server, config, ports) { }) }) - cleanMethod.setServer(server2) + teardownHelper.addServer(server2) server2.listen(ports.PORTAND72, () => { client = connect({ @@ -1171,7 +1169,7 @@ export default function abstractTest(server, config, ports) { reconnectPeriod: 0, }) - cleanMethod.setClient(client) + teardownHelper.addClient(client) client.once('connect', () => { client.publish( @@ -1240,7 +1238,7 @@ export default function abstractTest(server, config, ports) { }) }) - cleanMethod.setServer(server2) + teardownHelper.addServer(server2) server2.listen(ports.PORTAND103, () => { client = connect({ @@ -1251,7 +1249,7 @@ export default function abstractTest(server, config, ports) { reconnectPeriod: 0, }) - cleanMethod.setClient(client) + teardownHelper.addClient(client) client.once('connect', () => { client.publish( @@ -1775,7 +1773,7 @@ export default function abstractTest(server, config, ports) { }) }) - cleanMethod.setServer(server2) + teardownHelper.addServer(server2) server2.listen(ports.PORTAND50, () => { client = connect({ @@ -1788,7 +1786,7 @@ export default function abstractTest(server, config, ports) { outgoingStore, }) - cleanMethod.setClient(client) + teardownHelper.addClient(client) client.on('connect', () => { if (!reconnect) { @@ -3600,7 +3598,7 @@ export default function abstractTest(server, config, ports) { }) }) - cleanMethod.setServer(server2) + teardownHelper.addServer(server2) server2.listen(ports.PORTAND49, () => { client = connect({ @@ -3609,7 +3607,7 @@ export default function abstractTest(server, config, ports) { reconnectPeriod: 100, }) - cleanMethod.setClient(client) + teardownHelper.addClient(client) client.on('reconnect', () => { reconnectEvent = true @@ -3679,7 +3677,7 @@ export default function abstractTest(server, config, ports) { }) }) - cleanMethod.setServer(server2) + teardownHelper.addServer(server2) server2.listen(ports.PORTAND50, () => { client = connect({ @@ -3692,7 +3690,7 @@ export default function abstractTest(server, config, ports) { outgoingStore, }) - cleanMethod.setClient(client) + teardownHelper.addClient(client) client.on('connect', () => { if (!reconnect) { @@ -3728,7 +3726,7 @@ export default function abstractTest(server, config, ports) { }) }) - cleanMethod.setServer(server2) + teardownHelper.addServer(server2) server2.listen(ports.PORTAND50, () => { client = connect({ @@ -3740,7 +3738,7 @@ export default function abstractTest(server, config, ports) { reconnectPeriod: 0, }) - cleanMethod.setClient(client) + teardownHelper.addClient(client) client.on('connect', () => { client.subscribe('test', { qos: 2 }, (e) => { @@ -3791,7 +3789,7 @@ export default function abstractTest(server, config, ports) { }) }) - cleanMethod.setServer(server2) + teardownHelper.addServer(server2) server2.listen(ports.PORTAND50, () => { client = connect({ @@ -3804,7 +3802,7 @@ export default function abstractTest(server, config, ports) { outgoingStore, }) - cleanMethod.setClient(client) + teardownHelper.addClient(client) client.on('connect', () => { if (!reconnect) { @@ -3841,7 +3839,7 @@ export default function abstractTest(server, config, ports) { }) }) - cleanMethod.setServer(server2) + teardownHelper.addServer(server2) server2.listen(ports.PORTAND50, () => { client = connect({ @@ -3854,7 +3852,7 @@ export default function abstractTest(server, config, ports) { outgoingStore, }) - cleanMethod.setClient(client) + teardownHelper.addClient(client) client.on('connect', () => { if (!reconnect) { @@ -3896,7 +3894,7 @@ export default function abstractTest(server, config, ports) { }) }) - cleanMethod.setServer(server2) + teardownHelper.addServer(server2) server2.listen(ports.PORTAND50, () => { client = connect({ @@ -3909,7 +3907,7 @@ export default function abstractTest(server, config, ports) { outgoingStore, }) - cleanMethod.setClient(client) + teardownHelper.addClient(client) client.on('connect', () => { if (!reconnect) { @@ -3983,7 +3981,7 @@ export default function abstractTest(server, config, ports) { }) }) - cleanMethod.setServer(server2) + teardownHelper.addServer(server2) server2.listen(ports.PORTAND50, () => { client = connect({ @@ -3996,7 +3994,7 @@ export default function abstractTest(server, config, ports) { outgoingStore, }) - cleanMethod.setClient(client) + teardownHelper.addClient(client) client['nextId'] = 65535 diff --git a/test/helpers/clean_method.ts b/test/helpers/TeardownHelper.ts similarity index 78% rename from test/helpers/clean_method.ts rename to test/helpers/TeardownHelper.ts index 103bf4950..71768fa6f 100644 --- a/test/helpers/clean_method.ts +++ b/test/helpers/TeardownHelper.ts @@ -98,11 +98,7 @@ type ResetOptions = { * }) * ``` */ -class CleanMethod { - #client: MqttClient | null - - #server: ServerBuilderInstance | null - +class TeardownHelper { #methods: Map< string, { @@ -113,25 +109,23 @@ class CleanMethod { > constructor() { - this.#client = null - this.#server = null this.#methods = new Map() } /** * @description - * Set the client to be used as default to close. + * Add a client to close. */ - setClient(client: MqttClient | null) { - this.#client = client + addClient(client: MqttClient) { + this.add({}, this.closeClient, client) } /** * @description - * Set the server to be used as default to close. + * Add a server to close. */ - setServer(server: ServerBuilderInstance | null) { - this.#server = server + addServer(server: ServerBuilderInstance) { + this.add({}, this.closeServer, server) } /** @@ -158,24 +152,11 @@ class CleanMethod { return id } - /** - * - * @description - * Restart the class to its initial state. - * Set the client and server to `null` and remove all methods stored. - */ - reset(options?: ResetOptions) { - this.#client = null - this.#server = null - - this.resetMethods(options?.method) - } - /** * @description * Remove all methods stored. */ - resetMethods(options?: ResetMethodOptions) { + reset(options?: ResetMethodOptions) { if (options?.removeOnce) { for (const [id, { options: methodOptions }] of this.#methods) { if (methodOptions.executeOnce) { @@ -194,12 +175,10 @@ class CleanMethod { * @default * Use the `client` set in the class. */ - async closeClient(client?: MqttClient | null) { - const clientToClean = client ?? this.#client - - if (clientToClean) { + async closeClient(client: MqttClient) { + if (client) { await new Promise((resolve, reject) => { - clientToClean.end(true, (err) => { + client.end(true, (err) => { if (err) reject(err) else resolve() }) @@ -214,12 +193,10 @@ class CleanMethod { * @default * Use the `server` set in the class. */ - async closeServer(server?: ServerBuilderInstance | null) { - const serverToClean = server ?? this.#server - - if (serverToClean?.listening) { + async closeServer(server: ServerBuilderInstance) { + if (server?.listening) { await new Promise((resolve, reject) => { - serverToClean.close((err) => { + server.close((err) => { if (err) reject(err) else resolve() }) @@ -227,21 +204,6 @@ class CleanMethod { } } - /** - * @description - * Close the `client` and `server` connections. - * - * @default - * Use the `client` and `server` set in the class. - */ - async closeClientAndServer(options?: { - client?: MqttClient | null - server?: ServerBuilderInstance | null - }) { - await this.closeClient(options?.client) - await this.closeServer(options?.server) - } - /** * @param id Method id to be executed. * @@ -249,7 +211,7 @@ class CleanMethod { * Execute a method stored by its id * If the method has the option `executeOnce` set to `true`, it will be removed after execution. */ - async executeMethod(id: string) { + async run(id: string) { const method = this.#methods.get(id) if (!method) { @@ -272,7 +234,7 @@ class CleanMethod { * Execute all methods stored. * If a method has the option `executeOnce` set to `true`, it will be removed after execution. */ - async executeAllMethods() { + async runAll() { if (this.#methods.size === 0) { return } @@ -308,4 +270,4 @@ class CleanMethod { } } -export default CleanMethod +export default TeardownHelper From 7ae2764e7d49e4d3efd425335b3abd1cc3e22670 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1ximo=20Liberata=20Torres?= Date: Thu, 1 Aug 2024 10:49:06 -0400 Subject: [PATCH 8/8] docs: change examples according new changes --- test/helpers/TeardownHelper.ts | 30 +++++++++++++----------------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/test/helpers/TeardownHelper.ts b/test/helpers/TeardownHelper.ts index 71768fa6f..774582009 100644 --- a/test/helpers/TeardownHelper.ts +++ b/test/helpers/TeardownHelper.ts @@ -14,7 +14,7 @@ type AddOptions = { executeOnce?: boolean } -type ResetMethodOptions = { +type ResetOptions = { /** * @description * If `true`, only the methods that have the option `executeOnce` set to `true` will be removed. @@ -24,10 +24,6 @@ type ResetMethodOptions = { removeOnce?: boolean } -type ResetOptions = { - method?: ResetMethodOptions -} - /** * @description * Class to help clean the environment or close opened connections after tests finish. @@ -39,22 +35,22 @@ type ResetOptions = { * import { describe, it } from 'node:test' * import mqtt from './src' * import serverBuilder from './test/server_helpers_for_client_tests' - * import CleanMethod from './test/helpers/clean_method' + * import TeardownHelper from './test/helpers/TeardownHelper' * * * describe('Test', () => { - * const cleanMethod = new CleanMethod() + * const teardownHelper = new TeardownHelper() * * it('should clean the client and server', (t, done) => { * t.after(async () => { - * await cleanMethod.closeClientAndServer() + * await teardownHelper.runAll() * }) * * const server = serverBuilder('8883') * const client = mqtt.connect('mqtt://localhost') * - * cleanMethod.setServer(server) - * cleanMethod.setClient(client) + * teardownHelper.addServer(server) + * teardownHelper.addClient(client) * }) * }) * ``` @@ -64,15 +60,15 @@ type ResetOptions = { * import { describe, it, after } from 'node:test' * import mqtt from './src' * import serverBuilder from './test/server_helpers_for_client_tests' - * import CleanMethod from './test/helpers/clean_method' + * import TeardownHelper from './test/helpers/TeardownHelper' * * * describe('Test', () => { * - * const cleanMethod = new CleanMethod() + * const teardownHelper = new TeardownHelper() * let server = serverBuilder('8883') * - * cleanMethod.add({}, async () => { + * teardownHelper.add({}, async () => { * if (server?.listening) { * await new Promise((resolve, reject) => { * server.close((err) => { @@ -84,14 +80,14 @@ type ResetOptions = { * }) * * after(async () => { - * await cleanMethod.closeAllMethods() + * await teardownHelper.runAll() * }) * * it('should clean the client and server', (t, done) => { * server = serverBuilder('8883') * const client = mqtt.connect('mqtt://localhost') * - * cleanMethod.setClient(client) + * teardownHelper.addClient(client) * done() * }) * @@ -156,7 +152,7 @@ class TeardownHelper { * @description * Remove all methods stored. */ - reset(options?: ResetMethodOptions) { + reset(options?: ResetOptions) { if (options?.removeOnce) { for (const [id, { options: methodOptions }] of this.#methods) { if (methodOptions.executeOnce) { @@ -208,7 +204,7 @@ class TeardownHelper { * @param id Method id to be executed. * * @description - * Execute a method stored by its id + * Execute a method stored by its id. * If the method has the option `executeOnce` set to `true`, it will be removed after execution. */ async run(id: string) {