diff --git a/README.md b/README.md index e48a028953..e7fd661ca6 100644 --- a/README.md +++ b/README.md @@ -105,6 +105,8 @@ Zigbee2MQTT is made up of three modules, each developed in its own Github projec ### Developing Zigbee2MQTT uses TypeScript (partially for now). Therefore after making changes to files in the `lib/` directory you need to recompile Zigbee2MQTT. This can be done by executing `npm run build`. For faster development instead of running `npm run build` you can run `npm run build-watch` in another terminal session, this will recompile as you change files. +In first time before building you need to run `npm install --include=dev` +Before submitting changes run `npm run test-with-coverage`, `npm run pretty:check` and `npm run eslint` ## Supported devices diff --git a/lib/extension/receive.ts b/lib/extension/receive.ts index b3f35df3d4..2d8dc23a3a 100755 --- a/lib/extension/receive.ts +++ b/lib/extension/receive.ts @@ -3,6 +3,7 @@ import assert from 'assert'; import bind from 'bind-decorator'; import debounce from 'debounce'; import stringify from 'json-stable-stringify-without-jsonify'; +import throttle from 'throttleit'; import * as zhc from 'zigbee-herdsman-converters'; @@ -16,6 +17,7 @@ type DebounceFunction = (() => void) & {clear(): void} & {flush(): void}; export default class Receive extends Extension { private elapsed: {[s: string]: number} = {}; private debouncers: {[s: string]: {payload: KeyValue; publish: DebounceFunction}} = {}; + private throttlers: {[s: string]: {publish: PublishEntityState}} = {}; async start(): Promise { this.eventBus.onPublishEntityState(this, this.onPublishEntityState); @@ -68,6 +70,20 @@ export default class Receive extends Extension { this.debouncers[device.ieeeAddr].publish(); } + async publishThrottle(device: Device, payload: KeyValue, time: number): Promise { + if (!this.throttlers[device.ieeeAddr]) { + this.throttlers[device.ieeeAddr] = { + publish: throttle(this.publishEntityState, time * 1000), + }; + } + + // Update state cache right away. This makes sure that during throttling cached state is always up to date. + // By updating cache we make sure that state cache is always up-to-date. + this.state.set(device, payload); + + await this.throttlers[device.ieeeAddr].publish(device, payload, 'publishThrottle'); + } + // if debounce_ignore are specified (Array of strings) // then all newPayload values with key present in debounce_ignore // should equal or be undefined in oldPayload @@ -130,9 +146,11 @@ export default class Receive extends Extension { this.elapsed[data.device.ieeeAddr] = now; } - // Check if we have to debounce + // Check if we have to debounce or throttle if (data.device.options.debounce) { this.publishDebounce(data.device, payload, data.device.options.debounce, data.device.options.debounce_ignore); + } else if (data.device.options.throttle) { + await this.publishThrottle(data.device, payload, data.device.options.throttle); } else { await this.publishEntityState(data.device, payload); } diff --git a/lib/types/types.d.ts b/lib/types/types.d.ts index 97bc1f52f1..365bd3fdb4 100644 --- a/lib/types/types.d.ts +++ b/lib/types/types.d.ts @@ -46,7 +46,7 @@ declare global { properties?: {messageExpiryInterval: number}; } type Scene = {id: number; name: string}; - type StateChangeReason = 'publishDebounce' | 'groupOptimistic' | 'lastSeenChanged' | 'publishCached'; + type StateChangeReason = 'publishDebounce' | 'groupOptimistic' | 'lastSeenChanged' | 'publishCached' | 'publishThrottle'; type PublishEntityState = (entity: Device | Group, payload: KeyValue, stateChangeReason?: StateChangeReason) => Promise; type RecursivePartial = {[P in keyof T]?: RecursivePartial}; interface KeyValue { @@ -232,6 +232,7 @@ declare global { retrieve_state?: boolean; debounce?: number; debounce_ignore?: string[]; + throttle?: number; filtered_attributes?: string[]; filtered_cache?: string[]; filtered_optimistic?: string[]; diff --git a/package-lock.json b/package-lock.json index 507a1a11eb..cfa77b6b2d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -27,6 +27,7 @@ "rimraf": "^6.0.1", "semver": "^7.6.3", "source-map-support": "^0.5.21", + "throttleit": "^2.1.0", "uri-js": "^4.4.1", "winston": "^3.14.2", "winston-syslog": "^2.7.1", @@ -9700,6 +9701,18 @@ "dev": true, "license": "MIT" }, + "node_modules/throttleit": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/throttleit/-/throttleit-2.1.0.tgz", + "integrity": "sha512-nt6AMGKW1p/70DF/hGBdJB57B8Tspmbp5gfJ8ilhLnt7kkr2ye7hzD6NVG8GGErk2HWF34igrL2CXmNIkzKqKw==", + "license": "MIT", + "engines": { + "node": ">=18" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/thunky": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/thunky/-/thunky-1.1.0.tgz", diff --git a/package.json b/package.json index 9671bf8334..296f544d71 100644 --- a/package.json +++ b/package.json @@ -55,6 +55,7 @@ "rimraf": "^6.0.1", "semver": "^7.6.3", "source-map-support": "^0.5.21", + "throttleit": "^2.1.0", "uri-js": "^4.4.1", "winston": "^3.14.2", "winston-syslog": "^2.7.1", diff --git a/test/receive.test.js b/test/receive.test.js index dfcfc08e38..2064aaa10a 100755 --- a/test/receive.test.js +++ b/test/receive.test.js @@ -355,6 +355,79 @@ describe('Receive', () => { expect(JSON.parse(MQTT.publish.mock.calls[2][1])).toStrictEqual({temperature: 0.09, humidity: 0.01, pressure: 2}); }); + it('Should throttle multiple messages from spamming devices', async () => { + const device = zigbeeHerdsman.devices.SPAMMER; + const throttle_for_testing = 1; + settings.set(['device_options', 'throttle'], throttle_for_testing); + settings.set(['device_options', 'retain'], true); + settings.set(['devices', device.ieeeAddr, 'friendly_name'], 'spammer1'); + const data1 = {measuredValue: 1}; + const payload1 = { + data: data1, + cluster: 'msTemperatureMeasurement', + device, + endpoint: device.getEndpoint(1), + type: 'attributeReport', + linkquality: 10, + }; + await zigbeeHerdsman.events.message(payload1); + const data2 = {measuredValue: 2}; + const payload2 = { + data: data2, + cluster: 'msTemperatureMeasurement', + device, + endpoint: device.getEndpoint(1), + type: 'attributeReport', + linkquality: 10, + }; + await zigbeeHerdsman.events.message(payload2); + const data3 = {measuredValue: 3}; + const payload3 = { + data: data3, + cluster: 'msTemperatureMeasurement', + device, + endpoint: device.getEndpoint(1), + type: 'attributeReport', + linkquality: 10, + }; + await zigbeeHerdsman.events.message(payload3); + await flushPromises(); + + expect(MQTT.publish).toHaveBeenCalledTimes(1); + await flushPromises(); + expect(MQTT.publish).toHaveBeenCalledTimes(1); + expect(MQTT.publish.mock.calls[0][0]).toStrictEqual('zigbee2mqtt/spammer1'); + expect(JSON.parse(MQTT.publish.mock.calls[0][1])).toStrictEqual({temperature: 0.01}); + expect(MQTT.publish.mock.calls[0][2]).toStrictEqual({qos: 0, retain: true}); + + // Now we try after elapsed time to see if it publishes next message + const timeshift = throttle_for_testing * 2000; + jest.advanceTimersByTime(timeshift); + expect(MQTT.publish).toHaveBeenCalledTimes(2); + await flushPromises(); + + expect(MQTT.publish.mock.calls[1][0]).toStrictEqual('zigbee2mqtt/spammer1'); + expect(JSON.parse(MQTT.publish.mock.calls[1][1])).toStrictEqual({temperature: 0.03}); + expect(MQTT.publish.mock.calls[1][2]).toStrictEqual({qos: 0, retain: true}); + + const data4 = {measuredValue: 4}; + const payload4 = { + data: data4, + cluster: 'msTemperatureMeasurement', + device, + endpoint: device.getEndpoint(1), + type: 'attributeReport', + linkquality: 10, + }; + await zigbeeHerdsman.events.message(payload4); + await flushPromises(); + + expect(MQTT.publish).toHaveBeenCalledTimes(3); + expect(MQTT.publish.mock.calls[2][0]).toStrictEqual('zigbee2mqtt/spammer1'); + expect(JSON.parse(MQTT.publish.mock.calls[2][1])).toStrictEqual({temperature: 0.04}); + expect(MQTT.publish.mock.calls[2][2]).toStrictEqual({qos: 0, retain: true}); + }); + it('Shouldnt republish old state', async () => { // https://github.com/Koenkk/zigbee2mqtt/issues/3572 const device = zigbeeHerdsman.devices.bulb; diff --git a/test/stub/zigbeeHerdsman.js b/test/stub/zigbeeHerdsman.js index c433e9502b..5cdfe3c563 100644 --- a/test/stub/zigbeeHerdsman.js +++ b/test/stub/zigbeeHerdsman.js @@ -503,6 +503,8 @@ const devices = { 'lumi.sensor_86sw2.es1', ), WSDCGQ11LM: new Device('EndDevice', '0x0017880104e45522', 6539, 4151, [new Endpoint(1, [0], [])], true, 'Battery', 'lumi.weather'), + // This are not a real spammer device, just copy of previous to test the throttle filter + SPAMMER: new Device('EndDevice', '0x0017880104e455fe', 6539, 4151, [new Endpoint(1, [0], [])], true, 'Battery', 'lumi.weather'), RTCGQ11LM: new Device('EndDevice', '0x0017880104e45523', 6540, 4151, [new Endpoint(1, [0], [])], true, 'Battery', 'lumi.sensor_motion.aq2'), ZNCZ02LM: ZNCZ02LM, E1743: new Device('Router', '0x0017880104e45540', 6540, 4476, [new Endpoint(1, [0], [])], true, 'Mains (single phase)', 'TRADFRI on/off switch'),