-
Notifications
You must be signed in to change notification settings - Fork 5
/
neighbours.py
210 lines (167 loc) · 6.1 KB
/
neighbours.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
import asyncio
import enum
import logging
import os
from random import uniform
import zigpy.types as t
import zigpy.zdo.types as zdo_t
from zigpy.exceptions import DeliveryError
from zigpy.util import retryable
from homeassistant.util.json import save_json
LOGGER = logging.getLogger(__name__)
@retryable((DeliveryError, asyncio.TimeoutError), tries=5)
def wrapper(cmd, *args, **kwargs):
return cmd(*args, **kwargs)
async def routes_and_neighbours(app, listener, ieee, cmd, data, service):
if ieee is None:
LOGGER.error("missing ieee")
return
LOGGER.debug("Getting routes and neighbours: %s", service)
device = app.get_device(ieee=ieee)
await _routes_and_neighbours(device, listener)
async def _routes_and_neighbours(device, listener):
try:
routes = await asyncio.wait_for(async_get_routes(device), 180)
except asyncio.TimeoutError:
routes = []
await asyncio.sleep(uniform(1.0, 1.5))
try:
nbns = await asyncio.wait_for(async_get_neighbours(device), 180)
except asyncio.TimeoutError:
nbns = []
ieee_tail = "".join(["%02x" % (o,) for o in device.ieee])
file_suffix = "_{}.txt".format(ieee_tail)
routes_name = os.path.join(
listener._hass.config.config_dir, "scans", "routes" + file_suffix
)
save_json(routes_name, routes)
neighbours_name = os.path.join(
listener._hass.config.config_dir, "scans", "neighbours" + file_suffix
)
save_json(neighbours_name, nbns)
LOGGER.debug("Wrote scan results to '%s' and '%s'", routes_name, neighbours_name)
async def all_routes_and_neighbours(app, listener, ieee, cmd, data, service):
LOGGER.debug("Getting routes and neighbours for all devices: %s", service)
counter = 1
devs = [d for d in app.devices.values() if not d.node_desc.is_end_device]
for device in devs:
LOGGER.debug(
"%s: Quering routes and neighbours: %s out of %s",
device.ieee,
counter,
len(devs),
)
await _routes_and_neighbours(device, listener)
LOGGER.debug("%s: Got %s out of %s", device.ieee, counter, len(devs))
counter += 1
async def async_get_neighbours(device):
"""Pull neighbor table from a device."""
def _process_neighbor(nbg):
"""Return dict of a neighbor entry."""
class NeighbourType(enum.IntEnum):
Coordinator = 0x0
Router = 0x1
End_Device = 0x2
Unknown = 0x3
class RxOnIdle(enum.IntEnum):
Off = 0x0
On = 0x1
Unknown = 0x2
class Relation(enum.IntEnum):
Parent = 0x0
Child = 0x1
Sibling = 0x2
None_of_the_above = 0x3
Previous_Child = 0x4
class PermitJoins(enum.IntEnum):
Not_Accepting = 0x0
Accepting = 0x1
Unknown = 0x2
res = {}
res["pan_id"] = str(nbg.PanId)
res["ieee"] = str(nbg.IEEEAddr)
raw = nbg.NeighborType & 0x03
try:
nei_type = NeighbourType(raw).name
except ValueError:
nei_type = "undefined_0x{:02x}".format(raw)
res["device_type"] = nei_type
raw = (nbg.NeighborType >> 2) & 0x03
try:
rx_on = RxOnIdle(raw).name
except ValueError:
rx_on = "undefined_0x{:02x}".format(raw)
res["rx_on_when_idle"] = rx_on
raw = (nbg.NeighborType >> 4) & 0x07
try:
relation = Relation(raw).name
except ValueError:
relation = "undefined_0x{:02x}".format(raw)
res["relationship"] = relation
raw = nbg.PermitJoining & 0x02
try:
joins = PermitJoins(raw).name
except ValueError:
joins = "undefined_0x{:02x}".format(raw)
res["new_joins_accepted"] = joins
res["depth"] = nbg.Depth
res["lqi"] = nbg.LQI
return res
result = []
idx = 0
while True:
status, val = await device.zdo.request(zdo_t.ZDOCmd.Mgmt_Lqi_req, idx)
LOGGER.debug(
"%s: neighbor request Status: %s. Response: %r", device.ieee, status, val
)
if zdo_t.Status.SUCCESS != status:
LOGGER.debug("%s: device oes not support 'Mgmt_Lqi_req'", device.ieee)
break
neighbors = val.NeighborTableList
for neighbor in neighbors:
result.append(_process_neighbor(neighbor))
idx += 1
if idx >= val.Entries:
break
await asyncio.sleep(uniform(1.0, 1.5))
return sorted(result, key=lambda x: x["ieee"])
async def async_get_routes(device):
"""Pull routing table from a device."""
def _process_route(route):
"""Return a dict representing routing entry."""
class RouteStatus(enum.IntEnum):
Active = 0x0
Discovery_Underway = 0x1
Discovery_Failed = 0x2
Inactive = 0x3
Validation_Underway = 0x4
res = {}
res["destination"] = "0x{:04x}".format(route.DstNWK)
res["next_hop"] = "0x{:04x}".format(route.NextHop)
raw = route.RouteStatus & 0x07
try:
cooked = RouteStatus(raw).name
except ValueError:
cooked = "reserved_{:02x}".format(raw)
res["status"] = cooked
res["memory_constrained"] = bool((route.RouteStatus >> 3) & 0x01)
res["many_to_one"] = bool((route.RouteStatus >> 4) & 0x01)
res["route_record_required"] = bool((route.RouteStatus >> 5) & 0x01)
return res
routes = []
idx = 0
while True:
status, val = await device.zdo.request(zdo_t.ZDOCmd.Mgmt_Rtg_req, idx)
LOGGER.debug(
"%s: route request Status:%s. Routes: %r", device.ieee, status, val
)
if zdo_t.Status.SUCCESS != status:
LOGGER.debug("%s: Does not support 'Mgmt_rtg_req': %s", device.ieee, status)
break
for route in val.RoutingTableList:
routes.append(_process_route(route))
idx += 1
if idx >= val.Entries:
break
await asyncio.sleep(uniform(1.0, 1.5))
return routes