From cb4882e1b13dbff11d2971b9f9e0fbc378ea9d3b Mon Sep 17 00:00:00 2001 From: Denis Shulyaka Date: Sat, 17 Dec 2022 02:47:17 +0300 Subject: [PATCH] Enhance TuyaData type --- tests/test_tuya_clusters.py | 2 +- tests/test_tuya_mcu.py | 11 +------ zhaquirks/tuya/__init__.py | 58 ++++++++++++++++++++++------------ zhaquirks/tuya/mcu/__init__.py | 31 ++---------------- 4 files changed, 42 insertions(+), 60 deletions(-) diff --git a/tests/test_tuya_clusters.py b/tests/test_tuya_clusters.py index 98fcefef4d..1b2763188f 100644 --- a/tests/test_tuya_clusters.py +++ b/tests/test_tuya_clusters.py @@ -36,7 +36,7 @@ def test_tuya_data_value(): assert rest == extra assert r.dp_type == 2 - assert r.raw == b"\xdb\x02\x00\x00" + assert r.raw == b"\x00\x00\x02\xdb" assert r.payload == 731 diff --git a/tests/test_tuya_mcu.py b/tests/test_tuya_mcu.py index dc4a86fc72..45836347e7 100644 --- a/tests/test_tuya_mcu.py +++ b/tests/test_tuya_mcu.py @@ -4,16 +4,14 @@ from unittest import mock import pytest -import zigpy.types as t from zigpy.zcl import foundation import zhaquirks -from zhaquirks.tuya import TUYA_MCU_VERSION_RSP, TUYA_SET_TIME +from zhaquirks.tuya import TUYA_MCU_VERSION_RSP, TUYA_SET_TIME, TuyaDPType from zhaquirks.tuya.mcu import ( ATTR_MCU_VERSION, TuyaAttributesCluster, TuyaClusterData, - TuyaDPType, TuyaMCUCluster, ) @@ -216,13 +214,6 @@ async def test_tuya_methods(zigpy_device_from_quirk, quirk): async def test_tuya_mcu_classes(): """Test tuya conversion from Data to ztype and reverse.""" - # Test TuyaDPType class - assert len(TuyaDPType) == 6 - assert TuyaDPType.BOOL.ztype == t.Bool - # no ztype for TuyaDPType.RAW - assert not TuyaDPType.RAW.ztype - assert TuyaDPType(3) == TuyaDPType.STRING - # Test TuyaMCUCluster.MCUVersion class mcu_version = TuyaMCUCluster.MCUVersion.deserialize(b"\x00\x03\x82")[0] assert mcu_version diff --git a/zhaquirks/tuya/__init__.py b/zhaquirks/tuya/__init__.py index 0669fe7da6..0f74fd3c83 100644 --- a/zhaquirks/tuya/__init__.py +++ b/zhaquirks/tuya/__init__.py @@ -22,6 +22,7 @@ SHORT_PRESS, ZHA_SEND_EVENT, ) +from zhaquirks.xbee.types import uint32_t as uint32_t_be # --------------------------------------------------------- # Tuya Custom Cluster ID @@ -172,31 +173,26 @@ class TuyaData(t.Struct): function: t.uint8_t raw: t.LVBytes - @classmethod - def deserialize(cls, data: bytes) -> Tuple["TuyaData", bytes]: - """Deserialize data.""" - res = cls() - res.dp_type, data = TuyaDPType.deserialize(data) - res.function, data = t.uint8_t.deserialize(data) - res.raw, data = t.LVBytes.deserialize(data) - if res.dp_type not in ( - TuyaDPType.BITMAP, - TuyaDPType.STRING, - TuyaDPType.ENUM, - TuyaDPType.RAW, - ): - res.raw = res.raw[::-1] - return res, data - @property - def payload(self) -> Union[t.Bool, t.CharacterString, t.uint32_t, t.data32]: + def payload( + self, + ) -> Union[ + uint32_t_be, + t.Bool, + t.CharacterString, + t.enum8, + t.bitmap8, + t.bitmap16, + t.bitmap32, + t.LVBytes, + ]: """Payload accordingly to data point type.""" if self.dp_type == TuyaDPType.VALUE: - return t.uint32_t.deserialize(self.raw)[0] + return uint32_t_be.deserialize(self.raw)[0] elif self.dp_type == TuyaDPType.BOOL: return t.Bool.deserialize(self.raw)[0] elif self.dp_type == TuyaDPType.STRING: - return self.raw.decode("utf8") + return t.CharacterString(self.raw.decode("utf8")) elif self.dp_type == TuyaDPType.ENUM: return t.enum8.deserialize(self.raw)[0] elif self.dp_type == TuyaDPType.BITMAP: @@ -207,8 +203,30 @@ def payload(self) -> Union[t.Bool, t.CharacterString, t.uint32_t, t.data32]: raise ValueError(f"Wrong bitmap length: {len(self.raw)}") from exc elif self.dp_type == TuyaDPType.RAW: return self.raw + else: + raise ValueError(f"Unknown {self.dp_type} datapoint type") - raise ValueError(f"Unknown {self.dp_type} datapoint type") + @payload.setter + def payload(self, value): + """Set payload accordingly to data point type.""" + if self.dp_type == TuyaDPType.VALUE: + self.raw = uint32_t_be(value).serialize() + elif self.dp_type == TuyaDPType.BOOL: + self.raw = t.Bool(value).serialize() + elif self.dp_type == TuyaDPType.STRING: + self.raw = value + elif self.dp_type == TuyaDPType.ENUM: + self.raw = t.enum8(value).serialize() + elif self.dp_type == TuyaDPType.BITMAP: + bitmaps = {1: t.bitmap8, 2: t.bitmap16, 4: t.bitmap32} + try: + self.raw = bitmaps[len(value)](value).serialize() + except KeyError as exc: + raise ValueError(f"Wrong bitmap length: {len(value)}") from exc + elif self.dp_type == TuyaDPType.RAW: + self.raw = value.serialize() + else: + raise ValueError(f"Unknown {self.dp_type} datapoint type") class Data(t.List, item_type=t.uint8_t): diff --git a/zhaquirks/tuya/mcu/__init__.py b/zhaquirks/tuya/mcu/__init__.py index 1f4c3a2dbb..bc51e2b5cb 100644 --- a/zhaquirks/tuya/mcu/__init__.py +++ b/zhaquirks/tuya/mcu/__init__.py @@ -15,12 +15,12 @@ TUYA_MCU_VERSION_RSP, TUYA_SET_DATA, TUYA_SET_TIME, - Data, NoManufacturerCluster, PowerOnState, TuyaCommand, TuyaData, TuyaDatapointData, + TuyaDPType, TuyaLocalCluster, TuyaNewManufCluster, TuyaTimePayload, @@ -30,24 +30,6 @@ ATTR_MCU_VERSION = 0xEF00 -class TuyaDPType(t.enum8): - """Tuya DataPoint Type.""" - - RAW = 0x00, None - BOOL = 0x01, t.Bool - VALUE = 0x02, t.uint32_t - STRING = 0x03, None - ENUM = 0x04, t.enum8 - BITMAP = 0x05, None - - def __new__(cls, value, ztype): - """Overload instance to store the ztype.""" - - member = t.enum8.__new__(cls, value) - member.ztype = ztype - return member - - @dataclasses.dataclass class DPToAttributeMapping: """Container for datapoint to cluster attribute update mapping.""" @@ -231,20 +213,11 @@ def from_cluster_data(self, data: TuyaClusterData) -> Optional[TuyaCommand]: args.append(val) val = mapping.dp_converter(*args) self.debug("converted: %s", val) - if datapoint_type.ztype: - val = datapoint_type.ztype(val) - self.debug("ztype: %s", val) - val = Data(val) - self.debug("from_value: %s", val) tuya_data = TuyaData() tuya_data.dp_type = datapoint_type tuya_data.function = 0 - if datapoint_type == TuyaDPType.RAW: - tuya_data.raw = bytes(reversed(val[1:])) - else: - tuya_data.raw = t.LVBytes.deserialize(val)[0] - self.debug("raw: %s", tuya_data.raw) + tuya_data.payload = val dpd = TuyaDatapointData(dp, tuya_data) cmd_payload.datapoints = [dpd]