Skip to content

Commit

Permalink
Joining with link key (#150)
Browse files Browse the repository at this point in the history
* joining with install code support

* add test

* fix lint

* separate permit_with_key and permit_with_link_key methods
  • Loading branch information
Shulyaka authored Oct 1, 2023
1 parent 098b19c commit 53d516c
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 3 deletions.
20 changes: 20 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,26 @@ def test_handle_tx_status_duplicate(api):
assert send_fut.set_exception.call_count == 0


def test_handle_registration_status(api):
frame_id = 0x12
status = xbee_api.RegistrationStatus.SUCCESS
fut = asyncio.Future()
api._awaiting[frame_id] = (fut,)
api._handle_registration_status(frame_id, status)
assert fut.done() is True
assert fut.result() == xbee_api.RegistrationStatus.SUCCESS
assert fut.exception() is None

frame_id = 0x13
status = xbee_api.RegistrationStatus.KEY_TABLE_IS_FULL
fut = asyncio.Future()
api._awaiting[frame_id] = (fut,)
api._handle_registration_status(frame_id, status)
assert fut.done() is True
with pytest.raises(RuntimeError, match="Registration Status: KEY_TABLE_IS_FULL"):
fut.result()


async def test_command_mode_at_cmd(api):
command = "+++"

Expand Down
26 changes: 26 additions & 0 deletions tests/test_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,32 @@ async def test_permit(app):
assert app._api._at_command.call_args_list[0][0][1] == time_s


async def test_permit_with_key(app):
app._api._command = mock.AsyncMock(return_value=xbee_t.TXStatus.SUCCESS)
app._api._at_command = mock.AsyncMock(return_value="OK")
node = t.EUI64(b"\x01\x02\x03\x04\x05\x06\x07\x08")
code = b"\xC9\xA7\xD2\x44\x1A\x71\x16\x95\xCD\x62\x17\x0D\x33\x28\xEA\x2B\x42\x3D"
time_s = 500
await app.permit_with_key(node=node, code=code, time_s=time_s)
app._api._at_command.assert_called_once_with("KT", time_s)
app._api._command.assert_called_once_with(
"register_joining_device", node, 0xFFFE, 1, code
)


async def test_permit_with_link_key(app):
app._api._command = mock.AsyncMock(return_value=xbee_t.TXStatus.SUCCESS)
app._api._at_command = mock.AsyncMock(return_value="OK")
node = t.EUI64(b"\x01\x02\x03\x04\x05\x06\x07\x08")
link_key = b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F"
time_s = 500
await app.permit_with_link_key(node=node, link_key=link_key, time_s=time_s)
app._api._at_command.assert_called_once_with("KT", time_s)
app._api._command.assert_called_once_with(
"register_joining_device", node, 0xFFFE, 0, link_key
)


async def _test_request(
app, expect_reply=True, send_success=True, send_timeout=False, **kwargs
):
Expand Down
31 changes: 30 additions & 1 deletion zigpy_xbee/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,20 @@ class ModemStatus(t.uint8_t, t.UndefinedEnum):
_UNDEFINED = 0xFF


class RegistrationStatus(t.uint8_t, t.UndefinedEnum):
SUCCESS = 0x00
KEY_TOO_LONG = 0x01
TRANSIENT_KEY_TABLE_IS_FULL = 0x18
ADDRESS_NOT_FOUND_IN_THE_KEY_TABLE = 0xB1
KEY_IS_INVALID_OR_RESERVED = 0xB2
INVALID_ADDRESS = 0xB3
KEY_TABLE_IS_FULL = 0xB4
SECURITY_DATA_IS_INVALID_INSTALL_CODE_CRC_FAILS = 0xBD

UNKNOWN_MODEM_STATUS = 0xFF
_UNDEFINED = 0xFF


# https://www.digi.com/resources/documentation/digidocs/PDFs/90000976.pdf
COMMAND_REQUESTS = {
"at": (0x08, (t.FrameId, t.ATCommand, t.Bytes), 0x88),
Expand Down Expand Up @@ -120,7 +134,11 @@ class ModemStatus(t.uint8_t, t.UndefinedEnum):
(t.FrameId, t.EUI64, t.NWK, t.uint8_t, t.Relays),
None,
),
"register_joining_device": (0x24, (), None),
"register_joining_device": (
0x24,
(t.FrameId, t.EUI64, t.uint16_t, t.uint8_t, t.Bytes),
0xA4,
),
}
COMMAND_RESPONSES = {
"at_response": (0x88, (t.FrameId, t.ATCommand, t.uint8_t, t.Bytes), None),
Expand Down Expand Up @@ -155,6 +173,7 @@ class ModemStatus(t.uint8_t, t.UndefinedEnum):
"extended_status": (0x98, (), None),
"route_record_indicator": (0xA1, (t.EUI64, t.NWK, t.uint8_t, t.Relays), None),
"many_to_one_rri": (0xA3, (t.EUI64, t.NWK, t.uint8_t), None),
"registration_status": (0xA4, (t.FrameId, RegistrationStatus), None),
"node_id_indicator": (0x95, (), None),
}

Expand Down Expand Up @@ -201,6 +220,7 @@ class ModemStatus(t.uint8_t, t.UndefinedEnum):
"EO": t.uint8_t,
"NK": t.Bytes, # 128-bit value
"KY": t.Bytes, # 128-bit value
"KT": t.uint16_t, # 0x1E - 0xFFFF
# RF interfacing commands
"PL": t.uint8_t, # 0 - 4 (basically an Enum)
"PM": t.Bool,
Expand Down Expand Up @@ -549,6 +569,15 @@ def _handle_tx_status(self, frame_id, nwk, tries, tx_status, dsc_status):
except asyncio.InvalidStateError as ex:
LOGGER.debug("duplicate tx_status for %s nwk? State: %s", nwk, ex)

def _handle_registration_status(self, frame_id, status):
(fut,) = self._awaiting.pop(frame_id)
if status:
fut.set_exception(RuntimeError(f"Registration Status: {status.name}"))
return
LOGGER.debug(f"Registration Status: {status.name}")

fut.set_result(status)

def set_application(self, app):
self._app = app

Expand Down
17 changes: 15 additions & 2 deletions zigpy_xbee/zigbee/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,21 @@ async def permit_ncp(self, time_s=60):
await self._api._at_command("NJ", time_s)
await self._api._at_command("AC")

async def permit_with_key(self, node, code, time_s=60):
raise NotImplementedError("XBee does not support install codes")
async def permit_with_link_key(
self, node: EUI64, link_key: zigpy.types.KeyData, time_s: int = 500, key_type=0
):
"""Permits a new device to join with the given IEEE and link key."""
assert 0x1E <= time_s <= 0xFFFF
await self._api._at_command("KT", time_s)
reserved = 0xFFFE
# Key type:
# 0 = Pre-configured Link Key (KY command of the joining device)
# 1 = Install Code With CRC (I? command of the joining device)
await self._api.register_joining_device(node, reserved, key_type, link_key)

async def permit_with_key(self, node: EUI64, code: bytes, time_s=500):
"""Permits a new device to join with the given IEEE and Install Code."""
await self.permit_with_link_key(node, code, time_s, key_type=1)

def handle_modem_status(self, status):
LOGGER.info("Modem status update: %s (%s)", status.name, status.value)
Expand Down

0 comments on commit 53d516c

Please sign in to comment.