diff --git a/README.md b/README.md index 91bd023..3349eed 100644 --- a/README.md +++ b/README.md @@ -17,8 +17,10 @@ Options: --bootloader-baudrate NUMBERS [default: 115200] --cpc-baudrate NUMBERS [default: 460800, 115200, 230400] --ezsp-baudrate NUMBERS [default: 115200] - --spinel-baudrate NUMBERS [default: 460800] - --probe-method TEXT [default: bootloader, cpc, ezsp, spinel] + --router-baudrate NUMBERS [default: 115200] + --spinel-baudrate NUMBERS [default: 460800] + --probe-method TEXT [default: bootloader, cpc, ezsp, spinel, + router] --bootloader-reset [yellow|ihost|slzb07|sonoff] --help Show this message and exit. diff --git a/universal_silabs_flasher/common.py b/universal_silabs_flasher/common.py index 3a7e988..d46d3cb 100644 --- a/universal_silabs_flasher/common.py +++ b/universal_silabs_flasher/common.py @@ -181,7 +181,7 @@ class VersionComponent: @functools.total_ordering class Version: - _SEPARATORS = {".", "-", "/", "_", " build "} + _SEPARATORS = {".", "-", "/", "_", " build ", " GA build "} _SEPARATORS_REGEX = re.compile( "(" + "|".join(re.escape(s) for s in _SEPARATORS) + ")" ) diff --git a/universal_silabs_flasher/const.py b/universal_silabs_flasher/const.py index 201797a..28263a1 100644 --- a/universal_silabs_flasher/const.py +++ b/universal_silabs_flasher/const.py @@ -3,6 +3,7 @@ class FirmwareImageType(enum.Enum): ZIGBEE_NCP = "zigbee_ncp" + ZIGBEE_ROUTER = "zigbee_router" OPENTHREAD_RCP = "openthread_rcp" ZWAVE_NCP = "zwave_ncp" BOOTLOADER = "bootloader" @@ -26,6 +27,7 @@ class ApplicationType(enum.Enum): CPC = "cpc" EZSP = "ezsp" SPINEL = "spinel" + ROUTER = "router" FW_IMAGE_TYPE_TO_APPLICATION_TYPE = { @@ -33,6 +35,7 @@ class ApplicationType(enum.Enum): FirmwareImageType.MULTIPAN: ApplicationType.CPC, FirmwareImageType.OPENTHREAD_RCP: ApplicationType.SPINEL, FirmwareImageType.BOOTLOADER: ApplicationType.GECKO_BOOTLOADER, + FirmwareImageType.ZIGBEE_ROUTER: ApplicationType.ROUTER, } @@ -41,6 +44,7 @@ class ApplicationType(enum.Enum): ApplicationType.CPC: [460800, 115200, 230400], ApplicationType.EZSP: [115200], ApplicationType.SPINEL: [460800], + ApplicationType.ROUTER: [115200], } diff --git a/universal_silabs_flasher/flash.py b/universal_silabs_flasher/flash.py index 00a4ce4..290ae26 100644 --- a/universal_silabs_flasher/flash.py +++ b/universal_silabs_flasher/flash.py @@ -123,6 +123,12 @@ def convert(self, value: tuple | str, param: click.Parameter, ctx: click.Context type=CommaSeparatedNumbers(), show_default=True, ) +@click.option( + "--router-baudrate", + default=DEFAULT_BAUDRATES[ApplicationType.ROUTER], + type=CommaSeparatedNumbers(), + show_default=True, +) @click.option( "--spinel-baudrate", default=DEFAULT_BAUDRATES[ApplicationType.SPINEL], @@ -149,6 +155,7 @@ def main( bootloader_baudrate: list[int], cpc_baudrate: list[int], ezsp_baudrate: list[int], + router_baudrate: list[int], spinel_baudrate: list[int], probe_method: list[ApplicationType], bootloader_reset: str | None, @@ -190,6 +197,7 @@ def main( ApplicationType.GECKO_BOOTLOADER: bootloader_baudrate, ApplicationType.CPC: cpc_baudrate, ApplicationType.EZSP: ezsp_baudrate, + ApplicationType.ROUTER: router_baudrate, ApplicationType.SPINEL: spinel_baudrate, }, probe_methods=probe_method, @@ -341,6 +349,8 @@ async def flash( if flasher.app_type == ApplicationType.EZSP: running_image_type = FirmwareImageType.ZIGBEE_NCP + elif flasher.app_type == ApplicationType.ROUTER: + running_image_type = FirmwareImageType.ZIGBEE_ROUTER elif flasher.app_type == ApplicationType.SPINEL: running_image_type = FirmwareImageType.OPENTHREAD_RCP elif flasher.app_type == ApplicationType.CPC: diff --git a/universal_silabs_flasher/flasher.py b/universal_silabs_flasher/flasher.py index 5844b93..b3e7e2d 100644 --- a/universal_silabs_flasher/flasher.py +++ b/universal_silabs_flasher/flasher.py @@ -24,6 +24,7 @@ from .firmware import FirmwareImage from .gecko_bootloader import GeckoBootloaderProtocol, NoFirmwareError from .gpio import find_gpiochip_by_label, send_gpio_pattern +from .router import RouterProtocol from .spinel import SpinelProtocol from .xmodemcrc import BLOCK_SIZE as XMODEM_BLOCK_SIZE @@ -48,6 +49,7 @@ def __init__( ApplicationType.GECKO_BOOTLOADER, ApplicationType.CPC, ApplicationType.EZSP, + ApplicationType.ROUTER, ApplicationType.SPINEL, ), device: str, @@ -103,6 +105,9 @@ def _connect_cpc(self, baudrate: int): def _connect_ezsp(self, baudrate: int): return connect_ezsp(self._device, baudrate) + def _connect_router(self, baudrate: int): + return connect_protocol(self._device, baudrate, RouterProtocol) + def _connect_spinel(self, baudrate: int): return connect_protocol(self._device, baudrate, SpinelProtocol) @@ -150,6 +155,16 @@ async def probe_ezsp(self, baudrate: int) -> ProbeResult: continue_probing=False, ) + async def probe_router(self, baudrate: int) -> ProbeResult: + async with self._connect_router(baudrate) as router: + version = await router.probe() + + return ProbeResult( + version=version, + baudrate=baudrate, + continue_probing=False, + ) + async def probe_spinel(self, baudrate: int) -> ProbeResult: async with self._connect_spinel(baudrate) as spinel: version = await spinel.probe() @@ -194,6 +209,7 @@ async def probe_app_type( ApplicationType.CPC: self.probe_cpc, ApplicationType.EZSP: self.probe_ezsp, ApplicationType.SPINEL: self.probe_spinel, + ApplicationType.ROUTER: self.probe_router, } for probe_method, baudrate in ( @@ -266,6 +282,10 @@ async def enter_bootloader(self) -> None: async with self._connect_spinel(self.app_baudrate) as spinel: async with asyncio_timeout(PROBE_TIMEOUT): await spinel.enter_bootloader() + elif self.app_type is ApplicationType.ROUTER: + async with self._connect_router(self.app_baudrate) as router: + async with asyncio_timeout(PROBE_TIMEOUT): + await router.enter_bootloader() elif self.app_type is ApplicationType.EZSP: async with self._connect_ezsp(self.app_baudrate) as ezsp: try: diff --git a/universal_silabs_flasher/router.py b/universal_silabs_flasher/router.py new file mode 100644 index 0000000..38edfd1 --- /dev/null +++ b/universal_silabs_flasher/router.py @@ -0,0 +1,107 @@ +from __future__ import annotations + +import asyncio +import enum +import logging +import re + +from zigpy.serial import SerialProtocol + +from .common import PROBE_TIMEOUT, StateMachine, Version, asyncio_timeout + +_LOGGER = logging.getLogger(__name__) + +ROUTER_INFO_REGEX = re.compile(rb"stack ver\. \[(?P.*?)\]\r\n") + + +class State(str, enum.Enum): + STARTUP = "startup" + BOOTWAIT = "bootwait" + INFO = "info" + READY = "ready" + + +class RouterCommand(bytes, enum.Enum): + INFO = b"version\r\n" + BL_REBOOT = b"bootloader reboot\r\n" + + +class RouterProtocol(SerialProtocol): + def __init__(self) -> None: + super().__init__() + self._state_machine = StateMachine( + states=list(State), + initial=State.STARTUP, + ) + self._version: str | None = None + + async def probe(self) -> Version: + """Attempt to communicate with the router.""" + async with asyncio_timeout(PROBE_TIMEOUT): + return await self.router_info() + + async def router_info(self) -> Version: + """Get the router version.""" + await self.activate_prompt() + self._state_machine.state = State.INFO + self.send_data(RouterCommand.INFO) + + await self._state_machine.wait_for_state(State.READY) + + assert self._version is not None + return Version(self._version) + + async def activate_prompt(self) -> None: + """Send enter key to activate CLI prompt.""" + if self._state_machine.state == State.STARTUP: + await asyncio.sleep(0.5) + self.send_data(b"\r\n") + await self._state_machine.wait_for_state(State.READY) + + def send_data(self, data: bytes) -> None: + assert self._transport is not None + _LOGGER.debug("Sending data %s", data) + self._transport.write(data) + + def data_received(self, data: bytes) -> None: + super().data_received(data) + + while self._buffer: + _LOGGER.debug("Parsing %s: %r", self._state_machine.state, self._buffer) + if self._state_machine.state == State.STARTUP: + if b"\n>" not in self._buffer: + return + + self._buffer.clear() + self._state_machine.state = State.READY + + if self._state_machine.state == State.INFO: + match = ROUTER_INFO_REGEX.search(self._buffer) + + if match is None: + return + + self._version = match.group("version").decode("ascii") + _LOGGER.debug("Detected version string %r", self._version) + + self._buffer.clear() + self._state_machine.state = State.READY + + elif self._state_machine.state == State.BOOTWAIT: + if b"Gecko Bootloader" not in self._buffer: + return + + _LOGGER.debug("Bootloader started") + + self._buffer.clear() + self._state_machine.state = State.READY + + elif self._state_machine.state == State.READY: + self._buffer.clear() + + async def enter_bootloader(self) -> None: + await self.activate_prompt() + self._state_machine.state = State.BOOTWAIT + + self.send_data(RouterCommand.BL_REBOOT) + await self._state_machine.wait_for_state(State.READY)