Skip to content

Commit

Permalink
Support for Tuya-based NEO Siren (closes #444) (#530)
Browse files Browse the repository at this point in the history
* Initial support for Tuya-based NEO Siren

Also implement a base class to map non-standard tuya data to attributes.
Custom tuya clusters can derive from that class and any defined
attribute on the cluster will be automatically mapped.

* Implement temperature and humidity clusters

* Implement the On/Off cluster

* Update documentation

Co-authored-by: Julien '_FrnchFrgg_' RIVAUD <julien@frnchfrgg.pw>
  • Loading branch information
Julien "_FrnchFrgg_" Rivaud and FrnchFrgg authored Nov 2, 2020
1 parent b474b74 commit cae0d37
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 0 deletions.
1 change: 1 addition & 0 deletions Contributors.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@
- [Gleb Sinyavskiy](https://github.com/zhulik)
- [Michael Thingnes](https://github.com/thimic)
- [Piotr Mis](https://github.com/piomis)
- [Julien Rivaud](https://github.com/FrnchFrgg)
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ If you are looking to make your first code contribution to this project then we

### Tuya-based
- [TS0601 switch](https://zigbee.blakadder.com/Lerlink_X701A.html): Tuya-based 1-gang switches with neutral (e.g. Lerlink, Lonsonho)
- [Neo Siren](https://zigbee.blakadder.com/Neo_NAS-AB02B0.html): Tuya-based alarm siren with temperature and humidity sensors

# Configuration:

Expand Down
69 changes: 69 additions & 0 deletions zhaquirks/tuya/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,75 @@ class Command(t.Struct):
}


class TuyaManufClusterAttributes(TuyaManufCluster):
"""Manufacturer specific cluster for Tuya converting attributes <-> commands."""

def handle_cluster_request(self, tsn: int, command_id: int, args: Tuple) -> None:
"""Handle cluster request."""
if command_id not in (0x0001, 0x0002):
return super().handle_cluster_request(tsn, command_id, args)

tuya_cmd = args[0].command_id
tuya_value = args[0].data[1:] # first uint8_t is length

_LOGGER.debug(
"[0x%04x:%s:0x%04x] Received value %s "
"for attribute 0x%04x (command 0x%04x)",
self.endpoint.device.nwk,
self.endpoint.endpoint_id,
self.cluster_id,
repr(tuya_value),
tuya_cmd,
command_id,
)

if tuya_cmd not in self.attributes:
return

# tuya data is in big endian whereas ztypes use little endian
ztype = self.attributes[tuya_cmd][1]
zvalue, _ = ztype.deserialize(bytes(reversed(tuya_value)))
self._update_attribute(tuya_cmd, zvalue)

def read_attributes(
self, attributes, allow_cache=False, only_cache=False, manufacturer=None
):
"""Ignore remote reads as the "get_data" command doesn't seem to do anything."""

return super().read_attributes(
attributes, allow_cache=True, only_cache=True, manufacturer=manufacturer
)

async def write_attributes(self, attributes, manufacturer=None):
"""Defer attributes writing to the set_data tuya command."""

records = self._write_attr_records(attributes)

for record in records:
# serialized in little-endian
data = list(record.value.value.serialize())
# we want big-endian, with length prepended
data.append(len(data))
data.reverse()

cmd_payload = TuyaManufCluster.Command()
cmd_payload.status = 0
cmd_payload.tsn = self.endpoint.device.application.get_sequence()
cmd_payload.command_id = record.attrid
cmd_payload.function = 0
cmd_payload.data = data

await super().command(
TUYA_SET_DATA,
cmd_payload,
manufacturer=manufacturer,
expect_reply=False,
tsn=cmd_payload.tsn,
)

return (foundation.Status.SUCCESS,)


class TuyaOnOff(CustomCluster, OnOff):
"""Tuya On/Off cluster for On/Off device."""

Expand Down
179 changes: 179 additions & 0 deletions zhaquirks/tuya/siren.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
"""Map from manufacturer to standard clusters for the NEO Siren device."""
import logging

from typing import Optional, Union

from zigpy.profiles import zha
from zigpy.quirks import CustomDevice
from zigpy.zcl.clusters.general import Basic, Identify, Ota, OnOff
from zigpy.zcl.clusters.measurement import RelativeHumidity, TemperatureMeasurement
from zigpy.zcl import foundation
import zigpy.types as t

from .. import Bus, LocalDataCluster
from ..const import (
DEVICE_TYPE,
ENDPOINTS,
INPUT_CLUSTERS,
MODELS_INFO,
OUTPUT_CLUSTERS,
PROFILE_ID,
)

from . import TuyaManufClusterAttributes

TUYA_ALARM_ATTR = 0x0168 # [0]/[1] Alarm!
TUYA_TEMP_ALARM_ATTR = 0x0171 # [0]/[1] Disable/Enable alarm by temperature
TUYA_HUMID_ALARM_ATTR = 0x0172 # [0]/[1] Disable/Enable alarm by humidity
TUYA_ALARM_DURATION_ATTR = 0x0267 # [0,0,0,10] duration alarm in second
TUYA_TEMPERATURE_ATTR = 0x0269 # [0,0,0,240] temperature in decidegree
TUYA_HUMIDITY_ATTR = 0x026A # [0,0,0,36] humidity
TUYA_ALARM_MIN_TEMP_ATTR = 0x026B # [0,0,0,18] min alarm temperature threshold
TUYA_ALARM_MAX_TEMP_ATTR = 0x026C # [0,0,0,18] max alarm temperature threshold
TUYA_ALARM_MIN_HUMID_ATTR = 0x026D # [0,0,0,18] min alarm humidity threshold
TUYA_ALARM_MAX_HUMID_ATTR = 0x026E # [0,0,0,18] max alarm humidity threshold
TUYA_MELODY_ATTR = 0x0466 # [5] Melody
TUYA_VOLUME_ATTR = 0x0474 # [0]/[1]/[2] Volume 0-max, 2-low

_LOGGER = logging.getLogger(__name__)


class TuyaManufClusterSiren(TuyaManufClusterAttributes):
"""Manufacturer Specific Cluster of the NEO Siren device."""

manufacturer_attributes = {
TUYA_ALARM_ATTR: ("alarm", t.uint8_t),
TUYA_TEMP_ALARM_ATTR: ("enable_temperature_alarm", t.uint8_t),
TUYA_HUMID_ALARM_ATTR: ("enable_humidity_alarm", t.uint8_t),
TUYA_ALARM_DURATION_ATTR: ("alarm_duration", t.uint32_t),
TUYA_TEMPERATURE_ATTR: ("temperature", t.uint32_t),
TUYA_HUMIDITY_ATTR: ("humidity", t.uint32_t),
TUYA_ALARM_MIN_TEMP_ATTR: ("alarm_temperature_min", t.uint32_t),
TUYA_ALARM_MAX_TEMP_ATTR: ("alarm_temperature_max", t.uint32_t),
TUYA_ALARM_MIN_HUMID_ATTR: ("alarm_humidity_min", t.uint32_t),
TUYA_ALARM_MAX_HUMID_ATTR: ("alarm_humidity_max", t.uint32_t),
TUYA_MELODY_ATTR: ("melody", t.uint8_t),
TUYA_VOLUME_ATTR: ("volume", t.uint8_t),
}

def _update_attribute(self, attrid, value):
super()._update_attribute(attrid, value)
if attrid == TUYA_TEMPERATURE_ATTR:
self.endpoint.device.temperature_bus.listener_event(
"temperature_reported", value * 10 # decidegree to centidegree
)
elif attrid == TUYA_HUMIDITY_ATTR:
self.endpoint.device.humidity_bus.listener_event(
"humidity_reported", value * 100 # whole percentage to 1/1000th
)
elif attrid == TUYA_ALARM_ATTR:
self.endpoint.device.switch_bus.listener_event(
"switch_event", value # boolean 1=on / 0=off
)


class TuyaSirenOnOff(LocalDataCluster, OnOff):
"""Tuya On/Off cluster for siren device."""

ATTR_ID = 0

def __init__(self, *args, **kwargs):
"""Init."""
super().__init__(*args, **kwargs)
self.endpoint.device.switch_bus.add_listener(self)

def switch_event(self, state):
"""Switch event."""
self._update_attribute(self.ATTR_ID, state)

def command(
self,
command_id: Union[foundation.Command, int, t.uint8_t],
*args,
manufacturer: Optional[Union[int, t.uint16_t]] = None,
expect_reply: bool = True,
tsn: Optional[Union[int, t.uint8_t]] = None,
):
"""Override the default command and defer to the alarm attribute."""

if command_id in (0x0000, 0x0001):
return self.endpoint.tuya_manufacturer.write_attributes(
{TUYA_ALARM_ATTR: command_id}, manufacturer=manufacturer
)

return foundation.Status.UNSUP_CLUSTER_COMMAND


class TuyaTemperatureMeasurement(LocalDataCluster, TemperatureMeasurement):
"""Temperature cluster acting from events from temperature bus."""

cluster_id = TemperatureMeasurement.cluster_id
ATTR_ID = 0

def __init__(self, *args, **kwargs):
"""Init."""
super().__init__(*args, **kwargs)
self.endpoint.device.temperature_bus.add_listener(self)

def temperature_reported(self, value):
"""Temperature reported."""
self._update_attribute(self.ATTR_ID, value)


class TuyaRelativeHumidity(LocalDataCluster, RelativeHumidity):
"""Humidity cluster acting from events from humidity bus."""

cluster_id = RelativeHumidity.cluster_id
ATTR_ID = 0

def __init__(self, *args, **kwargs):
"""Init."""
super().__init__(*args, **kwargs)
self.endpoint.device.humidity_bus.add_listener(self)

def humidity_reported(self, value):
"""Humidity reported."""
self._update_attribute(self.ATTR_ID, value)


class TuyaSiren(CustomDevice):
"""NEO Tuya Siren and humidity/temperature sensor."""

def __init__(self, *args, **kwargs):
"""Init device."""
self.temperature_bus = Bus()
self.humidity_bus = Bus()
self.switch_bus = Bus()
super().__init__(*args, **kwargs)

signature = {
# endpoint=1 profile=260 device_type=0 device_version=0 input_clusters=[0, 3]
# output_clusters=[3, 25]>
MODELS_INFO: [("_TYST11_d0yu2xgi", "0yu2xgi")],
ENDPOINTS: {
1: {
PROFILE_ID: zha.PROFILE_ID,
DEVICE_TYPE: zha.DeviceType.ON_OFF_SWITCH,
INPUT_CLUSTERS: [Basic.cluster_id, Identify.cluster_id],
OUTPUT_CLUSTERS: [Identify.cluster_id, Ota.cluster_id],
}
},
}

replacement = {
ENDPOINTS: {
1: {
PROFILE_ID: zha.PROFILE_ID,
DEVICE_TYPE: zha.DeviceType.IAS_WARNING_DEVICE,
INPUT_CLUSTERS: [
Basic.cluster_id,
Identify.cluster_id,
TuyaManufClusterSiren,
TuyaTemperatureMeasurement,
TuyaRelativeHumidity,
TuyaSirenOnOff,
],
OUTPUT_CLUSTERS: [Identify.cluster_id, Ota.cluster_id],
}
}
}

0 comments on commit cae0d37

Please sign in to comment.