Skip to content

Commit

Permalink
Fix _parseEventPathTuple
Browse files Browse the repository at this point in the history
  • Loading branch information
agners committed May 23, 2024
1 parent b00ebb4 commit 842b373
Showing 1 changed file with 8 additions and 16 deletions.
24 changes: 8 additions & 16 deletions src/controller/python/chip/ChipDeviceCtrl.py
Original file line number Diff line number Diff line change
Expand Up @@ -1219,39 +1219,31 @@ def _parseEventPathTuple(self, pathTuple: typing.Union[
typing.Tuple[int,
typing.Type[ClusterObjects.ClusterEvent], int]
]):
endpoint = None
cluster = None
event = None
urgent = False
if pathTuple in [('*'), ()]:
# Wildcard
pass
return ClusterAttribute.EventPath()
elif not isinstance(pathTuple, tuple):
logging.debug(type(pathTuple))
if isinstance(pathTuple, int):
endpoint = pathTuple
return ClusterAttribute.EventPath(EndpointId=pathTuple)
elif issubclass(pathTuple, ClusterObjects.Cluster):
cluster = pathTuple
return ClusterAttribute.EventPath.from_cluster(EndpointId=None, Cluster=pathTuple)
elif issubclass(pathTuple, ClusterObjects.ClusterEvent):
event = pathTuple
return ClusterAttribute.EventPath.from_event(EndpointId=None, Event=pathTuple)
else:
raise ValueError("Unsupported Event Path")
else:
if pathTuple[0] == '*':
urgent = pathTuple[-1]
pass
return ClusterAttribute.EventPath(Urgent=pathTuple[-1])
else:
urgent = bool(pathTuple[-1]) if len(pathTuple) > 2 else False
# endpoint + (cluster) event / endpoint + cluster
endpoint = pathTuple[0]
if issubclass(pathTuple[1], ClusterObjects.Cluster):
cluster = pathTuple[1]
return ClusterAttribute.EventPath.from_cluster(EndpointId=pathTuple[0], Cluster=pathTuple[1], Urgent=urgent)
elif issubclass(pathTuple[1], ClusterAttribute.ClusterEvent):
event = pathTuple[1]
return ClusterAttribute.EventPath.from_event(EndpointId=pathTuple[0], Event=pathTuple[1], Urgent=urgent)
else:
raise ValueError("Unsupported Attribute Path")
urgent = bool(pathTuple[-1]) if len(pathTuple) > 2 else False
return ClusterAttribute.EventPath(
EndpointId=endpoint, Cluster=cluster, Event=event, Urgent=urgent)

async def Read(self, nodeid: int, attributes: typing.List[typing.Union[
None, # Empty tuple, all wildcard
Expand Down

0 comments on commit 842b373

Please sign in to comment.