From 66a1f427291049949da4c1d81f34d426ff7e19fa Mon Sep 17 00:00:00 2001 From: Denis Shulyaka Date: Mon, 9 Oct 2023 22:36:42 +0300 Subject: [PATCH] Use zigpy types where possible (#162) * use zigpy types * tests for EUI64 --- tests/test_api.py | 27 ++-- tests/test_application.py | 10 +- tests/test_types.py | 50 ++++---- zigpy_xbee/api.py | 251 +++++++++++++++----------------------- zigpy_xbee/types.py | 202 ++++++++++++++---------------- 5 files changed, 235 insertions(+), 305 deletions(-) diff --git a/tests/test_api.py b/tests/test_api.py index f94aced..70490b6 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -4,8 +4,9 @@ import pytest import serial import zigpy.exceptions +import zigpy.types as t -from zigpy_xbee import api as xbee_api, types as t, uart +from zigpy_xbee import api as xbee_api, types as xbee_t, uart import zigpy_xbee.config from zigpy_xbee.zigbee.application import ControllerApplication @@ -335,19 +336,19 @@ def _send_modem_event(api, event): def test_handle_modem_status(api): api._running.clear() api._reset.set() - _send_modem_event(api, xbee_api.ModemStatus.COORDINATOR_STARTED) + _send_modem_event(api, xbee_t.ModemStatus.COORDINATOR_STARTED) assert api.is_running is True assert api.reset_event.is_set() is True api._running.set() api._reset.set() - _send_modem_event(api, xbee_api.ModemStatus.DISASSOCIATED) + _send_modem_event(api, xbee_t.ModemStatus.DISASSOCIATED) assert api.is_running is False assert api.reset_event.is_set() is True api._running.set() api._reset.clear() - _send_modem_event(api, xbee_api.ModemStatus.HARDWARE_RESET) + _send_modem_event(api, xbee_t.ModemStatus.HARDWARE_RESET) assert api.is_running is False assert api.reset_event.is_set() is True @@ -371,26 +372,28 @@ def test_handle_explicit_rx_indicator(api): def _handle_tx_status(api, status, wrong_frame_id=False): - status = t.TXStatus(status) + status = xbee_t.TXStatus(status) frame_id = 0x12 send_fut = mock.MagicMock(spec=asyncio.Future) api._awaiting[frame_id] = (send_fut,) s = mock.sentinel if wrong_frame_id: frame_id += 1 - api._handle_tx_status(frame_id, s.dst_nwk, s.retries, status, t.DiscoveryStatus()) + api._handle_tx_status( + frame_id, s.dst_nwk, s.retries, status, xbee_t.DiscoveryStatus() + ) return send_fut def test_handle_tx_status_success(api): - fut = _handle_tx_status(api, t.TXStatus.SUCCESS) + fut = _handle_tx_status(api, xbee_t.TXStatus.SUCCESS) assert len(api._awaiting) == 0 assert fut.set_result.call_count == 1 assert fut.set_exception.call_count == 0 def test_handle_tx_status_except(api): - fut = _handle_tx_status(api, t.TXStatus.ADDRESS_NOT_FOUND) + fut = _handle_tx_status(api, xbee_t.TXStatus.ADDRESS_NOT_FOUND) assert len(api._awaiting) == 0 assert fut.set_result.call_count == 0 assert fut.set_exception.call_count == 1 @@ -404,7 +407,7 @@ def test_handle_tx_status_unexpected(api): def test_handle_tx_status_duplicate(api): - status = t.TXStatus.SUCCESS + status = xbee_t.TXStatus.SUCCESS frame_id = 0x12 send_fut = mock.MagicMock(spec=asyncio.Future) send_fut.set_result.side_effect = asyncio.InvalidStateError @@ -418,16 +421,16 @@ def test_handle_tx_status_duplicate(api): def test_handle_registration_status(api): frame_id = 0x12 - status = xbee_api.RegistrationStatus.SUCCESS + status = xbee_t.RegistrationStatus.SUCCESS fut = asyncio.Future() api._awaiting[frame_id] = (fut,) api._handle_registration_status(frame_id, status) assert fut.done() is True - assert fut.result() == xbee_api.RegistrationStatus.SUCCESS + assert fut.result() == xbee_t.RegistrationStatus.SUCCESS assert fut.exception() is None frame_id = 0x13 - status = xbee_api.RegistrationStatus.KEY_TABLE_IS_FULL + status = xbee_t.RegistrationStatus.KEY_TABLE_IS_FULL fut = asyncio.Future() api._awaiting[frame_id] = (fut,) api._handle_registration_status(frame_id, status) diff --git a/tests/test_application.py b/tests/test_application.py index af9b76e..c210c12 100644 --- a/tests/test_application.py +++ b/tests/test_application.py @@ -6,7 +6,7 @@ import zigpy.types as t import zigpy.zdo.types as zdo_t -from zigpy_xbee.api import ModemStatus, XBee +from zigpy_xbee.api import XBee import zigpy_xbee.config as config import zigpy_xbee.types as xbee_t from zigpy_xbee.zigbee import application @@ -74,10 +74,10 @@ def app(monkeypatch): def test_modem_status(app): - assert 0x00 in ModemStatus.__members__.values() - app.handle_modem_status(ModemStatus(0x00)) - assert 0xEE not in ModemStatus.__members__.values() - app.handle_modem_status(ModemStatus(0xEE)) + assert 0x00 in xbee_t.ModemStatus.__members__.values() + app.handle_modem_status(xbee_t.ModemStatus(0x00)) + assert 0xEE not in xbee_t.ModemStatus.__members__.values() + app.handle_modem_status(xbee_t.ModemStatus(0xEE)) def _test_rx( diff --git a/tests/test_types.py b/tests/test_types.py index 72efcb7..8e3e949 100644 --- a/tests/test_types.py +++ b/tests/test_types.py @@ -1,36 +1,17 @@ import pytest +import zigpy.types as t -from zigpy_xbee import types as t - - -def test_deserialize(): - extra = b"\xBE\xEF" - data = b"\xff\xff\xfe01234567" - schema = (t.uint8_t, t.int16s, t.EUI64) - result, rest = t.deserialize(data + extra, schema) - - assert rest == extra - assert result[0] == 0xFF - assert result[1] == -2 - assert result[2] == t.EUI64((0x37, 0x36, 0x35, 0x34, 0x33, 0x32, 0x31, 0x30)) - - -def test_serialize(): - data = [0xFF, -2, t.EUI64([t.uint8_t(i) for i in range(0x30, 0x38)])] - schema = (t.uint8_t, t.int16s, t.EUI64) - result = t.serialize(data, schema) - - assert result == b"\xff\xff\xfe76543210" +import zigpy_xbee.types as xbee_t def test_bytes_serialize(): data = 0x89AB.to_bytes(4, "big") - result = t.Bytes(data).serialize() + result = xbee_t.Bytes(data).serialize() assert result == data def test_bytes_deserialize(): - data, rest = t.Bytes.deserialize(0x89AB.to_bytes(3, "big")) + data, rest = xbee_t.Bytes.deserialize(0x89AB.to_bytes(3, "big")) assert data == b"\x00\x89\xAB" assert rest == b"" @@ -38,14 +19,14 @@ def test_bytes_deserialize(): def test_atcommand(): cmd = b"AI" data = 0x06.to_bytes(4, "big") - r_cmd, r_data = t.ATCommand.deserialize(cmd + data) + r_cmd, r_data = xbee_t.ATCommand.deserialize(cmd + data) assert r_cmd == cmd assert r_data == data def test_undefined_enum_undefined_value(): - class undEnum(t.uint8_t, t.UndefinedEnum): + class undEnum(t.uint8_t, xbee_t.UndefinedEnum): OK = 0 ERROR = 2 UNDEFINED_VALUE = 0xFF @@ -67,7 +48,7 @@ class undEnum(t.uint8_t, t.UndefinedEnum): def test_undefined_enum_undefinede(): - class undEnum(t.uint8_t, t.UndefinedEnum): + class undEnum(t.uint8_t, xbee_t.UndefinedEnum): OK = 0 ERROR = 2 UNDEFINED_VALUE = 0xFF @@ -77,7 +58,22 @@ class undEnum(t.uint8_t, t.UndefinedEnum): def test_nwk(): - nwk = t.NWK(0x1234) + nwk = xbee_t.NWK(0x1234) assert str(nwk) == "0x1234" assert repr(nwk) == "0x1234" + + +def test_eui64(): + extra = b"\xBE\xEF" + data = b"01234567" + + result, rest = xbee_t.EUI64.deserialize(data + extra) + + assert rest == extra + assert result == xbee_t.EUI64((0x37, 0x36, 0x35, 0x34, 0x33, 0x32, 0x31, 0x30)) + + data = xbee_t.EUI64([t.uint8_t(i) for i in range(0x30, 0x38)]) + result = data.serialize() + + assert result == b"76543210" diff --git a/zigpy_xbee/api.py b/zigpy_xbee/api.py index a28ec2a..35ae71d 100644 --- a/zigpy_xbee/api.py +++ b/zigpy_xbee/api.py @@ -7,11 +7,12 @@ import serial from zigpy.exceptions import APIException, DeliveryError +import zigpy.types as t import zigpy_xbee from zigpy_xbee.config import CONF_DEVICE_BAUDRATE, CONF_DEVICE_PATH, SCHEMA_DEVICE -from . import types as t, uart +from . import types as xbee_t, uart LOGGER = logging.getLogger(__name__) @@ -20,132 +21,66 @@ PROBE_TIMEOUT = 45 -class ModemStatus(t.uint8_t, t.UndefinedEnum): - HARDWARE_RESET = 0x00 - WATCHDOG_TIMER_RESET = 0x01 - JOINED_NETWORK = 0x02 - DISASSOCIATED = 0x03 - CONFIGURATION_ERROR_SYNCHRONIZATION_LOST = 0x04 - COORDINATOR_REALIGNMENT = 0x05 - COORDINATOR_STARTED = 0x06 - NETWORK_SECURITY_KEY_UPDATED = 0x07 - NETWORK_WOKE_UP = 0x0B - NETWORK_WENT_TO_SLEEP = 0x0C - VOLTAGE_SUPPLY_LIMIT_EXCEEDED = 0x0D - DEVICE_CLOUD_CONNECTED = 0x0E - DEVICE_CLOUD_DISCONNECTED = 0x0F - MODEM_KEY_ESTABLISHED = 0x10 - MODEM_CONFIGURATION_CHANGED_WHILE_JOIN_IN_PROGRESS = 0x11 - ACCESS_FAULT = 0x12 - FATAL_STACK_ERROR = 0x13 - PLKE_TABLE_INITIATED = 0x14 - PLKE_TABLE_SUCCESS = 0x15 - PLKE_TABLE_IS_FULL = 0x16 - PLKE_NOT_AUTHORIZED = 0x17 - PLKE_INVALID_TRUST_CENTER_REQUEST = 0x18 - PLKE_TRUST_CENTER_UPDATE_FAIL = 0x19 - PLKE_BAD_EUI_ADDRESS = 0x1A - PLKE_LINK_KEY_REJECTED = 0x1B - PLKE_UPDATE_OCCURED = 0x1C - PLKE_LINK_KEY_TABLE_CLEAR = 0x1D - ZIGBEE_FREQUENCY_AGILITY_HAS_REQUESTED_CHANNEL_CHANGE = 0x1E - ZIGBEE_EXECUTE_ATFR_NO_JOINABLE_BEACON_RESPONSES = 0x1F - ZIGBEE_TOKENS_SPACE_RECOVERED = 0x20 - ZIGBEE_TOKENS_SPACE_UNRECOVERABLE = 0x21 - ZIGBEE_TOKENS_SPACE_CORRUPTED = 0x22 - ZIGBEE_DUAL_MODE_METAFRAME_ERROR = 0x30 - BLE_CONNECT = 0x32 - BLE_DISCONNECT = 0x33 - NO_SECURE_SESSION_CONNECTION = 0x34 - CELL_COMPONENT_UPDATE_STARTED = 0x35 - CELL_COMPONENT_UPDATE_FAILED = 0x36 - CELL_COMPONENT_UPDATE_SUCCEDED = 0x37 - XBEE_FIRMWARE_UPDATE_STARTED = 0x38 - XBEE_FIRMWARE_UPDATE_FAILED = 0x39 - XBEE_WILL_RESET_TO_APPLY_FIRMWARE_UPDATE = 0x3A - SECURE_SESSION_SUCCESSFULLY_ESTABLISHED = 0x3B - SECURE_SESSION_ENDED = 0x3C - SECURE_SESSION_AUTHENTICATION_FAILED = 0x3D - PAN_ID_CONFLICT_DETECTED = 0x3E - PAN_ID_UPDATED_DUE_TO_CONFLICT = 0x3F - ROUTER_PAN_ID_CHANGED_BY_COORDINATOR_DUE_TO_CONFLICT = 0x40 - NETWORK_WATCHDOG_TIMEOUT_EXPIRED_THREE_TIMES = 0x42 - JOIN_WINDOW_OPENED = 0x43 - JOIN_WINDOW_CLOSED = 0x44 - NETWORK_SECURITY_KEY_ROTATION_INITIATED = 0x45 - STACK_RESET = 0x80 - FIB_BOOTLOADER_RESET = 0x81 - SEND_OR_JOIN_COMMAND_ISSUED_WITHOUT_CONNECTING_FROM_AP = 0x82 - ACCESS_POINT_NOT_FOUND = 0x83 - PSK_NOT_CONFIGURED = 0x84 - SSID_NOT_FOUND = 0x87 - FAILED_TO_JOIN_WITH_SECURITY_ENABLED = 0x88 - COER_LOCKUP_OR_CRYSTAL_FAILURE_RESET = 0x89 - INVALID_CHANNEL = 0x8A - LOW_VOLTAGE_RESET = 0x8B - FAILED_TO_JOIN_ACCESS_POINT = 0x8E - - UNKNOWN_MODEM_STATUS = 0xFF - _UNDEFINED = 0xFF - - -class RegistrationStatus(t.uint8_t, t.UndefinedEnum): - SUCCESS = 0x00 - KEY_TOO_LONG = 0x01 - TRANSIENT_KEY_TABLE_IS_FULL = 0x18 - ADDRESS_NOT_FOUND_IN_THE_KEY_TABLE = 0xB1 - KEY_IS_INVALID_OR_RESERVED = 0xB2 - INVALID_ADDRESS = 0xB3 - KEY_TABLE_IS_FULL = 0xB4 - SECURITY_DATA_IS_INVALID_INSTALL_CODE_CRC_FAILS = 0xBD - - UNKNOWN_MODEM_STATUS = 0xFF - _UNDEFINED = 0xFF - - # https://www.digi.com/resources/documentation/digidocs/PDFs/90000976.pdf COMMAND_REQUESTS = { - "at": (0x08, (t.FrameId, t.ATCommand, t.Bytes), 0x88), - "queued_at": (0x09, (t.FrameId, t.ATCommand, t.Bytes), 0x88), + "at": (0x08, (xbee_t.FrameId, xbee_t.ATCommand, xbee_t.Bytes), 0x88), + "queued_at": (0x09, (xbee_t.FrameId, xbee_t.ATCommand, xbee_t.Bytes), 0x88), "remote_at": ( 0x17, - (t.FrameId, t.EUI64, t.NWK, t.uint8_t, t.ATCommand, t.Bytes), + ( + xbee_t.FrameId, + xbee_t.EUI64, + xbee_t.NWK, + t.uint8_t, + xbee_t.ATCommand, + xbee_t.Bytes, + ), 0x97, ), "tx": (0x10, (), None), "tx_explicit": ( 0x11, ( - t.FrameId, - t.EUI64, - t.NWK, + xbee_t.FrameId, + xbee_t.EUI64, + xbee_t.NWK, t.uint8_t, t.uint8_t, - t.uint16_t, - t.uint16_t, + t.uint16_t_be, + t.uint16_t_be, t.uint8_t, t.uint8_t, - t.Bytes, + xbee_t.Bytes, ), 0x8B, ), "create_source_route": ( 0x21, - (t.FrameId, t.EUI64, t.NWK, t.uint8_t, t.Relays), + (xbee_t.FrameId, xbee_t.EUI64, xbee_t.NWK, t.uint8_t, xbee_t.Relays), None, ), "register_joining_device": ( 0x24, - (t.FrameId, t.EUI64, t.uint16_t, t.uint8_t, t.Bytes), + (xbee_t.FrameId, xbee_t.EUI64, t.uint16_t_be, t.uint8_t, xbee_t.Bytes), 0xA4, ), } COMMAND_RESPONSES = { - "at_response": (0x88, (t.FrameId, t.ATCommand, t.uint8_t, t.Bytes), None), - "modem_status": (0x8A, (ModemStatus,), None), + "at_response": ( + 0x88, + (xbee_t.FrameId, xbee_t.ATCommand, t.uint8_t, xbee_t.Bytes), + None, + ), + "modem_status": (0x8A, (xbee_t.ModemStatus,), None), "tx_status": ( 0x8B, - (t.FrameId, t.NWK, t.uint8_t, t.TXStatus, t.DiscoveryStatus), + ( + xbee_t.FrameId, + xbee_t.NWK, + t.uint8_t, + xbee_t.TXStatus, + xbee_t.DiscoveryStatus, + ), None, ), "route_information": (0x8D, (), None), @@ -153,74 +88,85 @@ class RegistrationStatus(t.uint8_t, t.UndefinedEnum): "explicit_rx_indicator": ( 0x91, ( - t.EUI64, - t.NWK, + xbee_t.EUI64, + xbee_t.NWK, t.uint8_t, t.uint8_t, - t.uint16_t, - t.uint16_t, + t.uint16_t_be, + t.uint16_t_be, t.uint8_t, - t.Bytes, + xbee_t.Bytes, ), None, ), "rx_io_data_long_addr": (0x92, (), None), "remote_at_response": ( 0x97, - (t.FrameId, t.EUI64, t.NWK, t.ATCommand, t.uint8_t, t.Bytes), + ( + xbee_t.FrameId, + xbee_t.EUI64, + xbee_t.NWK, + xbee_t.ATCommand, + t.uint8_t, + xbee_t.Bytes, + ), None, ), "extended_status": (0x98, (), None), - "route_record_indicator": (0xA1, (t.EUI64, t.NWK, t.uint8_t, t.Relays), None), - "many_to_one_rri": (0xA3, (t.EUI64, t.NWK, t.uint8_t), None), - "registration_status": (0xA4, (t.FrameId, RegistrationStatus), None), + "route_record_indicator": ( + 0xA1, + (xbee_t.EUI64, xbee_t.NWK, t.uint8_t, xbee_t.Relays), + None, + ), + "many_to_one_rri": (0xA3, (xbee_t.EUI64, xbee_t.NWK, t.uint8_t), None), + "registration_status": (0xA4, (xbee_t.FrameId, xbee_t.RegistrationStatus), None), "node_id_indicator": (0x95, (), None), } # https://www.digi.com/resources/documentation/digidocs/pdfs/90001539.pdf pg 175 AT_COMMANDS = { # Addressing commands - "DH": t.uint32_t, - "DL": t.uint32_t, - "MY": t.uint16_t, - "MP": t.uint16_t, - "NC": t.uint32_t, # 0 - MAX_CHILDREN. - "SH": t.uint32_t, - "SL": t.uint32_t, + "DH": t.uint32_t_be, + "DL": t.uint32_t_be, + "MY": t.uint16_t_be, + "MP": t.uint16_t_be, + "NC": t.uint32_t_be, # 0 - MAX_CHILDREN. + "SH": t.uint32_t_be, + "SL": t.uint32_t_be, "NI": t, # 20 byte printable ascii string "SE": t.uint8_t, "DE": t.uint8_t, - "CI": t.uint16_t, + "CI": t.uint16_t_be, "TO": t.uint8_t, - "NP": t.uint16_t, - "DD": t.uint32_t, + "NP": t.uint16_t_be, + "DD": t.uint32_t_be, "CR": t.uint8_t, # 0 - 0x3F # Networking commands "CH": t.uint8_t, # 0x0B - 0x1A "DA": t, # no param - "ID": t.uint64_t, - "OP": t.uint64_t, + "ID": t.uint64_t_be, + "OP": t.uint64_t_be, "NH": t.uint8_t, "BH": t.uint8_t, # 0 - 0x1E - "OI": t.uint16_t, + "OI": t.uint16_t_be, "NT": t.uint8_t, # 0x20 - 0xFF "NO": t.uint8_t, # bitfield, 0 - 3 - "SC": t.uint16_t, # 1 - 0xFFFF + "SC": t.uint16_t_be, # 1 - 0xFFFF "SD": t.uint8_t, # 0 - 7 "ZS": t.uint8_t, # 0 - 2 "NJ": t.uint8_t, "JV": t.Bool, - "NW": t.uint16_t, # 0 - 0x64FF + "NW": t.uint16_t_be, # 0 - 0x64FF "JN": t.Bool, "AR": t.uint8_t, "DJ": t.Bool, # WTF, docs - "II": t.uint16_t, + "II": t.uint16_t_be, # Security commands "EE": t.Bool, "EO": t.uint8_t, - "NK": t.Bytes, # 128-bit value - "KY": t.Bytes, # 128-bit value - "KT": t.uint16_t, # 0x1E - 0xFFFF + "NK": xbee_t.Bytes, # 128-bit value + "KY": xbee_t.Bytes, # 128-bit value + "KT": t.uint16_t_be, # 0x1E - 0xFFFF # RF interfacing commands "PL": t.uint8_t, # 0 - 4 (basically an Enum) "PM": t.Bool, @@ -237,10 +183,10 @@ class RegistrationStatus(t.uint8_t, t.UndefinedEnum): "P3": t.uint8_t, # 0 - 5 (an Enum) "P4": t.uint8_t, # 0 - 5 (an Enum) # MAC diagnostics commands - "ED": t.Bytes, # 16-byte value + "ED": xbee_t.Bytes, # 16-byte value # I/O commands - "IR": t.uint16_t, - "IC": t.uint16_t, + "IR": t.uint16_t_be, + "IC": t.uint16_t_be, "D0": t.uint8_t, # 0 - 5 (an Enum) "D1": t.uint8_t, # 0 - 5 (an Enum) "D2": t.uint8_t, # 0 - 5 (an Enum) @@ -258,31 +204,31 @@ class RegistrationStatus(t.uint8_t, t.UndefinedEnum): "P8": t.uint8_t, # 0 - 5 (an Enum) "P9": t.uint8_t, # 0 - 5 (an Enum) "LT": t.uint8_t, - "PR": t.uint16_t, + "PR": t.uint16_t_be, "RP": t.uint8_t, - "%V": t.uint16_t, # read only - "V+": t.uint16_t, - "TP": t.uint16_t, - "M0": t.uint16_t, # 0 - 0x3FF - "M1": t.uint16_t, # 0 - 0x3FF + "%V": t.uint16_t_be, # read only + "V+": t.uint16_t_be, + "TP": t.uint16_t_be, + "M0": t.uint16_t_be, # 0 - 0x3FF + "M1": t.uint16_t_be, # 0 - 0x3FF # Diagnostics commands - "VR": t.uint16_t, - "HV": t.uint16_t, + "VR": t.uint16_t_be, + "HV": t.uint16_t_be, "AI": t.uint8_t, # AT command options - "CT": t.uint16_t, # 2 - 0x028F + "CT": t.uint16_t_be, # 2 - 0x028F "CN": None, - "GT": t.uint16_t, + "GT": t.uint16_t_be, "CC": t.uint8_t, # Sleep commands "SM": t.uint8_t, - "SN": t.uint16_t, - "SP": t.uint16_t, - "ST": t.uint16_t, + "SN": t.uint16_t_be, + "SP": t.uint16_t_be, + "ST": t.uint16_t_be, "SO": t.uint8_t, - "WH": t.uint16_t, + "WH": t.uint16_t_be, "SI": None, - "PO": t.uint16_t, # 0 - 0x3E8 + "PO": t.uint16_t_be, # 0 - 0x3E8 # Execution commands "AC": None, "WR": None, @@ -291,7 +237,7 @@ class RegistrationStatus(t.uint8_t, t.UndefinedEnum): "NR": t.Bool, "CB": t.uint8_t, "ND": t, # "optional 2-Byte NI value" - "DN": t.Bytes, # "up to 20-Byte printable ASCII string" + "DN": xbee_t.Bytes, # "up to 20-Byte printable ASCII string" "IS": None, "1S": None, "AS": None, @@ -515,12 +461,15 @@ def _handle_many_to_one_rri(self, ieee, nwk, reserved): def _handle_modem_status(self, status): LOGGER.debug("Handle modem status frame: %s", status) status = status - if status == ModemStatus.COORDINATOR_STARTED: + if status == xbee_t.ModemStatus.COORDINATOR_STARTED: self.coordinator_started_event.set() - elif status in (ModemStatus.HARDWARE_RESET, ModemStatus.WATCHDOG_TIMER_RESET): + elif status in ( + xbee_t.ModemStatus.HARDWARE_RESET, + xbee_t.ModemStatus.WATCHDOG_TIMER_RESET, + ): self.reset_event.set() self.coordinator_started_event.clear() - elif status == ModemStatus.DISASSOCIATED: + elif status == xbee_t.ModemStatus.DISASSOCIATED: self.coordinator_started_event.clear() if self._app: @@ -559,9 +508,9 @@ def _handle_tx_status(self, frame_id, nwk, tries, tx_status, dsc_status): try: if tx_status in ( - t.TXStatus.BROADCAST_APS_TX_ATTEMPT, - t.TXStatus.SELF_ADDRESSED, - t.TXStatus.SUCCESS, + xbee_t.TXStatus.BROADCAST_APS_TX_ATTEMPT, + xbee_t.TXStatus.SELF_ADDRESSED, + xbee_t.TXStatus.SUCCESS, ): fut.set_result(tx_status) else: diff --git a/zigpy_xbee/types.py b/zigpy_xbee/types.py index 5985c06..887337d 100644 --- a/zigpy_xbee/types.py +++ b/zigpy_xbee/types.py @@ -1,18 +1,6 @@ import enum -import zigpy.types - - -def deserialize(data, schema): - result = [] - for type_ in schema: - value, data = type_.deserialize(data) - result.append(value) - return result, data - - -def serialize(data, schema): - return b"".join(t(v).serialize() for t, v in zip(schema, data)) +import zigpy.types as t class Bytes(bytes): @@ -30,96 +18,7 @@ def deserialize(cls, data): return cls(data[:2]), data[2:] -class int_t(int): - _signed = True - - def serialize(self): - return self.to_bytes(self._size, "big", signed=self._signed) - - @classmethod - def deserialize(cls, data): - # Work around https://bugs.python.org/issue23640 - r = cls(int.from_bytes(data[: cls._size], "big", signed=cls._signed)) - data = data[cls._size :] - return r, data - - -class int8s(int_t): - _size = 1 - - -class int16s(int_t): - _size = 2 - - -class int24s(int_t): - _size = 3 - - -class int32s(int_t): - _size = 4 - - -class int40s(int_t): - _size = 5 - - -class int48s(int_t): - _size = 6 - - -class int56s(int_t): - _size = 7 - - -class int64s(int_t): - _size = 8 - - -class uint_t(int_t): - _signed = False - - -class uint8_t(uint_t): - _size = 1 - - -class uint16_t(uint_t): - _size = 2 - - -class uint24_t(uint_t): - _size = 3 - - -class uint32_t(uint_t): - _size = 4 - - -class uint40_t(uint_t): - _size = 5 - - -class uint48_t(uint_t): - _size = 6 - - -class uint56_t(uint_t): - _size = 7 - - -class uint64_t(uint_t): - _size = 8 - - -class Bool(uint8_t, enum.Enum): - # Boolean type with values true and false. - - false = 0x00 # An alias for zero, used for clarity. - true = 0x01 # An alias for one, used for clarity. - - -class EUI64(zigpy.types.EUI64): +class EUI64(t.EUI64): @classmethod def deserialize(cls, data): r, data = super().deserialize(data) @@ -149,11 +48,11 @@ class UndefinedEnum(enum.Enum, metaclass=UndefinedEnumMeta): pass -class FrameId(uint8_t): +class FrameId(t.uint8_t): pass -class NWK(uint16_t): +class NWK(t.uint16_t_be): def __repr__(self): return f"0x{self:04x}" @@ -161,15 +60,15 @@ def __str__(self): return f"0x{self:04x}" -class Relays(zigpy.types.LVList, item_type=NWK, length_type=uint8_t): +class Relays(t.LVList, item_type=NWK, length_type=t.uint8_t): """List of Relays.""" -UNKNOWN_IEEE = EUI64([uint8_t(0xFF) for i in range(0, 8)]) +UNKNOWN_IEEE = EUI64([t.uint8_t(0xFF) for i in range(0, 8)]) UNKNOWN_NWK = NWK(0xFFFE) -class TXStatus(uint8_t, UndefinedEnum): +class TXStatus(t.uint8_t, UndefinedEnum): """TX Status frame.""" SUCCESS = 0x00 # Standard @@ -230,7 +129,7 @@ class TXStatus(uint8_t, UndefinedEnum): _UNDEFINED = 0x2C -class DiscoveryStatus(uint8_t, UndefinedEnum): +class DiscoveryStatus(t.uint8_t, UndefinedEnum): """Discovery status of TX Status frame.""" SUCCESS = 0x00 @@ -241,9 +140,92 @@ class DiscoveryStatus(uint8_t, UndefinedEnum): _UNDEFINED = 0x00 -class TXOptions(zigpy.types.bitmap8): +class TXOptions(t.bitmap8): NONE = 0x00 Disable_Retries_and_Route_Repair = 0x01 Enable_APS_Encryption = 0x20 Use_Extended_TX_Timeout = 0x40 + + +class ModemStatus(t.uint8_t, UndefinedEnum): + HARDWARE_RESET = 0x00 + WATCHDOG_TIMER_RESET = 0x01 + JOINED_NETWORK = 0x02 + DISASSOCIATED = 0x03 + CONFIGURATION_ERROR_SYNCHRONIZATION_LOST = 0x04 + COORDINATOR_REALIGNMENT = 0x05 + COORDINATOR_STARTED = 0x06 + NETWORK_SECURITY_KEY_UPDATED = 0x07 + NETWORK_WOKE_UP = 0x0B + NETWORK_WENT_TO_SLEEP = 0x0C + VOLTAGE_SUPPLY_LIMIT_EXCEEDED = 0x0D + DEVICE_CLOUD_CONNECTED = 0x0E + DEVICE_CLOUD_DISCONNECTED = 0x0F + MODEM_KEY_ESTABLISHED = 0x10 + MODEM_CONFIGURATION_CHANGED_WHILE_JOIN_IN_PROGRESS = 0x11 + ACCESS_FAULT = 0x12 + FATAL_STACK_ERROR = 0x13 + PLKE_TABLE_INITIATED = 0x14 + PLKE_TABLE_SUCCESS = 0x15 + PLKE_TABLE_IS_FULL = 0x16 + PLKE_NOT_AUTHORIZED = 0x17 + PLKE_INVALID_TRUST_CENTER_REQUEST = 0x18 + PLKE_TRUST_CENTER_UPDATE_FAIL = 0x19 + PLKE_BAD_EUI_ADDRESS = 0x1A + PLKE_LINK_KEY_REJECTED = 0x1B + PLKE_UPDATE_OCCURED = 0x1C + PLKE_LINK_KEY_TABLE_CLEAR = 0x1D + ZIGBEE_FREQUENCY_AGILITY_HAS_REQUESTED_CHANNEL_CHANGE = 0x1E + ZIGBEE_EXECUTE_ATFR_NO_JOINABLE_BEACON_RESPONSES = 0x1F + ZIGBEE_TOKENS_SPACE_RECOVERED = 0x20 + ZIGBEE_TOKENS_SPACE_UNRECOVERABLE = 0x21 + ZIGBEE_TOKENS_SPACE_CORRUPTED = 0x22 + ZIGBEE_DUAL_MODE_METAFRAME_ERROR = 0x30 + BLE_CONNECT = 0x32 + BLE_DISCONNECT = 0x33 + NO_SECURE_SESSION_CONNECTION = 0x34 + CELL_COMPONENT_UPDATE_STARTED = 0x35 + CELL_COMPONENT_UPDATE_FAILED = 0x36 + CELL_COMPONENT_UPDATE_SUCCEDED = 0x37 + XBEE_FIRMWARE_UPDATE_STARTED = 0x38 + XBEE_FIRMWARE_UPDATE_FAILED = 0x39 + XBEE_WILL_RESET_TO_APPLY_FIRMWARE_UPDATE = 0x3A + SECURE_SESSION_SUCCESSFULLY_ESTABLISHED = 0x3B + SECURE_SESSION_ENDED = 0x3C + SECURE_SESSION_AUTHENTICATION_FAILED = 0x3D + PAN_ID_CONFLICT_DETECTED = 0x3E + PAN_ID_UPDATED_DUE_TO_CONFLICT = 0x3F + ROUTER_PAN_ID_CHANGED_BY_COORDINATOR_DUE_TO_CONFLICT = 0x40 + NETWORK_WATCHDOG_TIMEOUT_EXPIRED_THREE_TIMES = 0x42 + JOIN_WINDOW_OPENED = 0x43 + JOIN_WINDOW_CLOSED = 0x44 + NETWORK_SECURITY_KEY_ROTATION_INITIATED = 0x45 + STACK_RESET = 0x80 + FIB_BOOTLOADER_RESET = 0x81 + SEND_OR_JOIN_COMMAND_ISSUED_WITHOUT_CONNECTING_FROM_AP = 0x82 + ACCESS_POINT_NOT_FOUND = 0x83 + PSK_NOT_CONFIGURED = 0x84 + SSID_NOT_FOUND = 0x87 + FAILED_TO_JOIN_WITH_SECURITY_ENABLED = 0x88 + COER_LOCKUP_OR_CRYSTAL_FAILURE_RESET = 0x89 + INVALID_CHANNEL = 0x8A + LOW_VOLTAGE_RESET = 0x8B + FAILED_TO_JOIN_ACCESS_POINT = 0x8E + + UNKNOWN_MODEM_STATUS = 0xFF + _UNDEFINED = 0xFF + + +class RegistrationStatus(t.uint8_t, UndefinedEnum): + SUCCESS = 0x00 + KEY_TOO_LONG = 0x01 + TRANSIENT_KEY_TABLE_IS_FULL = 0x18 + ADDRESS_NOT_FOUND_IN_THE_KEY_TABLE = 0xB1 + KEY_IS_INVALID_OR_RESERVED = 0xB2 + INVALID_ADDRESS = 0xB3 + KEY_TABLE_IS_FULL = 0xB4 + SECURITY_DATA_IS_INVALID_INSTALL_CODE_CRC_FAILS = 0xBD + + UNKNOWN_MODEM_STATUS = 0xFF + _UNDEFINED = 0xFF