Skip to content

Commit

Permalink
LQI and RSSI sensors for some devices (#153)
Browse files Browse the repository at this point in the history
* LQI and RSSI sensors for some devices

* add tests

* set only relevant radio details

* Apply suggestions from code review

Co-authored-by: puddly <[email protected]>

* Add docstrings

* update test_application.py

---------

Co-authored-by: puddly <[email protected]>
  • Loading branch information
Shulyaka and puddly authored Oct 11, 2023
1 parent 39ef2f1 commit 468fe69
Show file tree
Hide file tree
Showing 2 changed files with 182 additions and 5 deletions.
147 changes: 142 additions & 5 deletions tests/test_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,14 +194,15 @@ def test_rx_unknown_device_ieee(app):

@pytest.fixture
def device(app):
"""Sample zigpee.Device fixture."""
"""Sample zigpy.device.Device fixture."""

nwk = t.uint16_t(0x1234)

def _device(new=False, zdo_init=False, nwk=nwk):
def _device(
new=False, zdo_init=False, nwk=0x1234, ieee=b"\x08\x07\x06\x05\x04\x03\x02\x01"
):
from zigpy.device import Device, Status as DeviceStatus

ieee, _ = t.EUI64.deserialize(b"\x08\x07\x06\x05\x04\x03\x02\x01")
nwk = t.uint16_t(nwk)
ieee, _ = t.EUI64.deserialize(ieee)
dev = Device(app, ieee, nwk)
if new:
dev.status = DeviceStatus.NEW
Expand Down Expand Up @@ -732,3 +733,139 @@ async def test_energy_scan(app):
25: 7.264,
26: 3.844,
}


def test_neighbors_updated(app, device):
"""Test LQI from neighbour scan."""
router = device(ieee=b"\x01\x02\x03\x04\x05\x06\x07\x08")
router.radio_details = mock.MagicMock()
end_device = device(ieee=b"\x08\x07\x06\x05\x04\x03\x02\x01")
end_device.radio_details = mock.MagicMock()

app.devices[router.ieee] = router
app.devices[end_device.ieee] = end_device

pan_id = t.ExtendedPanId(b"\x07\x07\x07\x07\x07\x07\x07\x07")
# The router has two neighbors: the coordinator and the end device
neighbors = [
zdo_t.Neighbor(
extended_pan_id=pan_id,
ieee=app.state.node_info.ieee,
nwk=app.state.node_info.nwk,
device_type=0x0,
rx_on_when_idle=0x1,
relationship=0x00,
reserved1=0x0,
permit_joining=0x0,
reserved2=0x0,
depth=0,
lqi=128,
),
zdo_t.Neighbor(
extended_pan_id=pan_id,
ieee=end_device.ieee,
nwk=end_device.nwk,
device_type=0x2,
rx_on_when_idle=0x0,
relationship=0x01,
reserved1=0x0,
permit_joining=0x0,
reserved2=0x0,
depth=2,
lqi=100,
),
# Let's also include an unknown device
zdo_t.Neighbor(
extended_pan_id=pan_id,
ieee=t.EUI64(b"\x00\x0F\x0E\x0D\x0C\x0B\x0A\x09"),
nwk=t.NWK(0x9999),
device_type=0x2,
rx_on_when_idle=0x0,
relationship=0x01,
reserved1=0x0,
permit_joining=0x0,
reserved2=0x0,
depth=2,
lqi=99,
),
]

app.neighbors_updated(router.ieee, neighbors)

router.radio_details.assert_called_once_with(lqi=128)
end_device.radio_details.assert_called_once_with(lqi=100)


def test_routes_updated_schedule(app):
"""Test scheduling the sync routes_updated function."""
app.create_task = mock.MagicMock()
app._routes_updated = mock.MagicMock()

ieee = t.EUI64(b"\x01\x02\x03\x04\x05\x06\x07\x08")
routes = []
app.routes_updated(ieee, routes)

assert app.create_task.call_count == 1
app._routes_updated.assert_called_once_with(ieee, routes)


async def test_routes_updated(app, device):
"""Test RSSI on routes scan update."""
rssi = 0x50
app._api._at_command = mock.AsyncMock(return_value=rssi)

router1 = device(ieee=b"\x01\x02\x03\x04\x05\x06\x07\x08")
router1.radio_details = mock.MagicMock()
router2 = device(ieee=b"\x08\x07\x06\x05\x04\x03\x02\x01")
router2.radio_details = mock.MagicMock()

app.devices[router1.ieee] = router1
app.devices[router2.ieee] = router2

# Let router1 be immediate child and route2 be child of the router1.
# Then the routes of router1 would be:
routes = [
zdo_t.Route(
DstNWK=app.state.node_info.nwk,
RouteStatus=0x00,
MemoryConstrained=0x0,
ManyToOne=0x1,
RouteRecordRequired=0x0,
Reserved=0x0,
NextHop=app.state.node_info.nwk,
),
zdo_t.Route(
DstNWK=router2.nwk,
RouteStatus=0x00,
MemoryConstrained=0x0,
ManyToOne=0x0,
RouteRecordRequired=0x0,
Reserved=0x0,
NextHop=router2.nwk,
),
]

await app._routes_updated(router1.ieee, routes)

router1.radio_details.assert_called_once_with(rssi=-80)
assert router2.radio_details.call_count == 0

router1.radio_details.reset_mock()

routes = [
zdo_t.Route(
DstNWK=router1.nwk,
RouteStatus=0x00,
MemoryConstrained=0x0,
ManyToOne=0x0,
RouteRecordRequired=0x0,
Reserved=0x0,
NextHop=router1.nwk,
)
]
await app._routes_updated(router2.ieee, routes)

assert router1.radio_details.call_count == 0
assert router2.radio_details.call_count == 0

app._api._at_command.assert_awaited_once_with("DB")
40 changes: 40 additions & 0 deletions zigpy_xbee/zigbee/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def __init__(self, config: dict[str, Any]):
"""Initialize instance."""
super().__init__(config=zigpy.config.ZIGPY_SCHEMA(config))
self._api: zigpy_xbee.api.XBee | None = None
self.topology.add_listener(self)

async def disconnect(self):
"""Shutdown application."""
Expand Down Expand Up @@ -384,6 +385,45 @@ def handle_rx(
)
)

def neighbors_updated(
self, ieee: zigpy.types.EUI64, neighbors: list[zdo_t.Neighbor]
) -> None:
"""Neighbor update from Mgmt_Lqi_req."""
for neighbor in neighbors:
if neighbor.relationship == zdo_t.Neighbor.Relationship.Parent:
device = self.get_device(ieee=ieee)
device.radio_details(lqi=neighbor.lqi)

elif neighbor.relationship == zdo_t.Neighbor.Relationship.Child:
try:
child_device = self.get_device(ieee=neighbor.ieee)
child_device.radio_details(lqi=neighbor.lqi)
except KeyError:
LOGGER.warning("Unknown device %r", neighbor.ieee)

def routes_updated(
self, ieee: zigpy.types.EUI64, routes: list[zdo_t.Route]
) -> None:
"""Route update from Mgmt_Rtg_req."""
self.create_task(
self._routes_updated(ieee, routes), f"routes_updated-ieee={ieee}"
)

async def _routes_updated(
self, ieee: zigpy.types.EUI64, routes: list[zdo_t.Route]
) -> None:
"""Get RSSI for adjacent routers on Route update from Mgmt_Rtg_req."""
for route in routes:
if (
route.DstNWK == self.state.node_info.nwk
and route.NextHop == self.state.node_info.nwk
and route.RouteStatus == zdo_t.RouteStatus.Active
):
device = self.get_device(ieee=ieee)
rssi = await self._api._at_command("DB")
device.radio_details(rssi=-rssi)
break


class XBeeCoordinator(zigpy.quirks.CustomDevice):
"""Zigpy Device representing Coordinator."""
Expand Down

0 comments on commit 468fe69

Please sign in to comment.