diff --git a/roborock/testing/__init__.py b/roborock/testing/__init__.py new file mode 100644 index 00000000..054bcfbc --- /dev/null +++ b/roborock/testing/__init__.py @@ -0,0 +1,78 @@ +"""Testing fakes and simulators for python-roborock. + +This package provides stateful firmware simulators (e.g. `V1VacuumSimulator`), +fake transport channels (`FakeChannel`), and cloud orchestration simulators (`FakeRoborockCloud`) +to allow downstream consumers (such as Home Assistant integrations) to write high-fidelity +integration tests using the real client library classes instead of fragile top-level mocks. + +Testing Architecture & Boundaries +--------------------------------- +We fake communication at two boundaries: +1. **Network HTTP API Interception**: `FakeRoborockCloud.patch_device_manager()` routes + HTTP requests (such as discovery, login, home details) to custom mock endpoints using + `aioresponses` under the hood. No Python client methods are mocked; the real EAPI client + executes fully. +2. **Plaintext RPC Message Interception**: Device communication is intercepted at the + plaintext JSON RPC level (Layer 2). The real client classes (`V1Channel`, `MqttChannel`) + run under test, but their transport calls are intercepted by our stateful simulators. + + ┌────────────────────────────────────────────────────────┐ + │ TESTED CLIENT (REAL CODE) │ + │ │ + │ RoborockDevice / Traits / V1RpcChannel / V1Channel │ + └──────────────────────────┬─────────────────────────────┘ + │ + ROBOROCKMESSAGE PAYLOADS + (Plaintext JSON commands) + │ + ┌──────────────────────────▼─────────────────────────────┐ + │ SIMULATOR (TEST FAKE) │ + │ │ + │ FakeChannel (Intercepts publish/subscribe) │ + │ RoborockDeviceSimulator (Stateful firmware simulator) │ + └────────────────────────────────────────────────────────┘ + +Integration Usage Example +------------------------- +```python +from roborock.testing import FakeRoborockCloud, V1VacuumSimulator + +async def test_start_vacuum_service(): + # Setup cloud state and add a simulated vacuum device + cloud = FakeRoborockCloud() + fake_device = V1VacuumSimulator(duid="living_room_s7", battery=100, state=RoborockStateCode.charging) + cloud.add_device(fake_device) + + # Patch channels and API calls using our cloud context manager + with cloud.patch_device_manager(): + # Create the real client manager (logins and discovers natively via mock HTTP) + manager = await create_device_manager( + user_params=UserParams(username="test_user", user_data=USER_DATA), + cache=InMemoryCache(), + ) + + # Fetch the discovered device client + devices = await manager.get_devices() + device = devices[0] + + # Trigger client start command + await device.v1_properties.command.send("app_start") + + # Assert against the simulated vacuum state + assert fake_device.state == RoborockStateCode.cleaning +``` +""" + +from roborock.testing.channel import FakeChannel +from roborock.testing.cloud import FakeRoborockCloud, FakeUserState, FakeWebApiClient +from roborock.testing.simulator import RoborockDeviceSimulator +from roborock.testing.v1_simulator import V1VacuumSimulator + +__all__ = [ + "FakeChannel", + "FakeRoborockCloud", + "FakeUserState", + "FakeWebApiClient", + "RoborockDeviceSimulator", + "V1VacuumSimulator", +] diff --git a/roborock/testing/channel.py b/roborock/testing/channel.py new file mode 100644 index 00000000..68456e5a --- /dev/null +++ b/roborock/testing/channel.py @@ -0,0 +1,122 @@ +"""Fake channel transport implementation for python-roborock. + +This module defines `FakeChannel`, which simulates low-level connection, +subscription, and publishing logic at the message boundary. It acts as an +in-memory replacement for `MqttChannel` and `LocalChannel` during testing. +""" + +from collections.abc import Callable +from unittest.mock import AsyncMock, MagicMock + +from roborock.mqtt.health_manager import HealthManager +from roborock.protocols.v1_protocol import LocalProtocolVersion +from roborock.roborock_message import RoborockMessage + + +class FakeChannel: + """A stateful, in-memory transport simulator. + + It captures all published messages in `published_messages`, maintains a registry + of active callbacks in `subscribers`, and enables tests or stateful simulators to + unconditionally push unsolicited messages using `notify_subscribers`. + + Caller API + ---------- + The public interface consists of `AsyncMock` / `MagicMock` attributes that + wrap internal implementations. Because they are mocks, callers can: + + - **Inspect calls**: ``channel.publish.assert_called_once()`` + - **Inject failures**: ``channel.publish.side_effect = RoborockException(...)`` + to simulate transport errors on the next publish. + - **Replace behavior**: ``channel.connect.side_effect = my_custom_connect`` + to substitute entirely custom logic. + - **Queue canned responses**: Append to ``channel.response_queue`` to have + the channel automatically deliver a response to subscribers on the next + publish (useful for low-level RPC request/response testing). + - **Push unsolicited messages**: Call ``channel.notify_subscribers(msg)`` + to simulate the device broadcasting a state change. + """ + + def __init__(self, is_local: bool = False): + """Initialize the fake channel.""" + self.subscribers: list[Callable[[RoborockMessage], None]] = [] + self.published_messages: list[RoborockMessage] = [] + self.response_queue: list[RoborockMessage] = [] + self._is_connected = False + self._is_local = is_local + + # Set this to an exception instance to make the next publish raise it. + # This is a convenience shortcut; callers can also replace + # ``publish.side_effect`` directly for more control. + self.publish_side_effect: Exception | None = None + + # AsyncMock wrapping _publish. Callers can replace side_effect to + # inject transport errors, e.g.: + # channel.publish.side_effect = RoborockException("timeout") + self.publish = AsyncMock(side_effect=self._publish) + + # AsyncMock wrapping _subscribe. Callers can replace side_effect to + # simulate subscription failures, e.g.: + # channel.subscribe.side_effect = RoborockException("sub failed") + self.subscribe = AsyncMock(side_effect=self._subscribe) # type: ignore[assignment] + + # AsyncMock wrapping _connect. Callers can replace side_effect to + # simulate connection failures, e.g.: + # channel.connect.side_effect = RoborockException("refused") + self.connect = AsyncMock(side_effect=self._connect) + + # MagicMock wrapping _close. Callers can assert close was called + # or inject errors on teardown. + self.close = MagicMock(side_effect=self._close) + + self.protocol_version = LocalProtocolVersion.V1 + self.restart = AsyncMock() + self.health_manager = HealthManager(self.restart) + + async def _connect(self) -> None: + self._is_connected = True + + def _close(self) -> None: + self._is_connected = False + + @property + def is_connected(self) -> bool: + """Return true if connected.""" + return self._is_connected + + @property + def is_local_connected(self) -> bool: + """Return true if locally connected.""" + return self._is_connected and self._is_local + + async def _publish(self, message: RoborockMessage) -> None: + """Default publish implementation. + + Records the message in ``published_messages`` and, if + ``response_queue`` is non-empty, pops the first response and + delivers it to all current subscribers (simulating a + request/response round-trip). + """ + self.published_messages.append(message) + if self.publish_side_effect: + raise self.publish_side_effect + if self.response_queue: + response = self.response_queue.pop(0) + self.notify_subscribers(response) + + async def _subscribe(self, callback: Callable[[RoborockMessage], None]) -> Callable[[], None]: + """Default subscribe implementation. + + Registers the callback and returns an unsubscribe function. + """ + self.subscribers.append(callback) + return lambda: self.subscribers.remove(callback) + + def notify_subscribers(self, message: RoborockMessage) -> None: + """Deliver a message to all current subscribers. + + Use this to simulate the channel receiving an unsolicited message + from the device (e.g. a state change broadcast). + """ + for subscriber in list(self.subscribers): + subscriber(message) diff --git a/roborock/testing/cloud.py b/roborock/testing/cloud.py new file mode 100644 index 00000000..d5be4829 --- /dev/null +++ b/roborock/testing/cloud.py @@ -0,0 +1,259 @@ +"""Cloud environment simulator for python-roborock testing. + +This module provides `FakeRoborockCloud` which acts as a central registry +for all simulated devices, dynamically faking HTTP endpoints via aioresponses +to simulate physical devices connected to the Roborock Cloud. +""" + +import contextlib +import re +from typing import Any +from unittest.mock import AsyncMock, patch + +from aioresponses import CallbackResult, aioresponses + +from roborock.data import HomeData +from roborock.devices.rpc.v1_channel import create_v1_channel as original_create_v1_channel +from roborock.devices.transport.mqtt_channel import create_mqtt_channel as original_create_mqtt_channel +from roborock.testing.simulator import DEFAULT_KEY_T, RoborockDeviceSimulator +from roborock.testing.v1_simulator import V1VacuumSimulator + +# EAPI Base URL pattern constants +IOT_API_BASE_URL = r"https://.*iot\.roborock\.com/api/v1" +REST_API_BASE_URL = r"https://api-.*\.roborock\.com" + + +class FakeUserState: + """Holds the fake user account details in the cloud environment.""" + + def __init__(self): + self.username = "test_user@gmail.com" + self.nickname = "user_nickname" + self.country = "US" + self.country_code = "1" + self.region = "us" + self.home_id = 123456 + self.home_name = "Fake Home" + self.uid = 123456 + self.rruid = "abc123" + self.token = "abc123" + self.rriot_u = "user123" + self.rriot_s = "pass123" + self.rriot_h = "unknown123" + + +class FakeWebApiClient: + """Fakes the EAPI at the HTTP network boundary using aioresponses. + + Exposes attributes that allow test suites (like Home Assistant) to easily + override response payloads, status codes, and simulate API errors. + """ + + def __init__(self, cloud: "FakeRoborockCloud"): + self.cloud = cloud + self.url_by_email_status = 200 + self.url_by_email_payload: dict[str, Any] | None = None # Synthesized if None + self.login_status = 200 + self.login_payload: dict[str, Any] | None = None # Synthesized if None + self.home_detail_status = 200 + self.home_detail_payload: dict[str, Any] | None = None # Synthesized if None + self.homes_status = 200 + self.homes_payload_override: dict[str, Any] | None = None + + def get_url_by_email_payload(self) -> dict[str, Any]: + """Synthesize getUrlByEmail payload.""" + if self.url_by_email_payload is not None: + return self.url_by_email_payload + return { + "code": 200, + "data": { + "country": self.cloud.user.country, + "countrycode": self.cloud.user.country_code, + "url": f"https://{self.cloud.user.region}iot.roborock.com", + }, + "msg": "success", + } + + def get_login_payload(self) -> dict[str, Any]: + """Synthesize login payload using the cloud user profile state.""" + if self.login_payload is not None: + return self.login_payload + return { + "code": 200, + "data": { + "uid": self.cloud.user.uid, + "tokentype": "token_type", + "token": self.cloud.user.token, + "rruid": self.cloud.user.rruid, + "region": self.cloud.user.region, + "countrycode": self.cloud.user.country_code, + "country": self.cloud.user.country, + "nickname": self.cloud.user.nickname, + "rriot": { + "u": self.cloud.user.rriot_u, + "s": self.cloud.user.rriot_s, + "h": self.cloud.user.rriot_h, + "k": DEFAULT_KEY_T, + "r": { + "r": self.cloud.user.country, + "a": f"https://api-{self.cloud.user.region}.roborock.com", + "l": f"https://wood-{self.cloud.user.region}.roborock.com", + "m": f"tcp://mqtt-{self.cloud.user.region}.roborock.com:8883", + }, + }, + "tuyaDeviceState": 2, + "avatarurl": "https://files.roborock.com/iottest/default_avatar.png", + }, + "msg": "success", + } + + def get_home_detail_payload(self) -> dict[str, Any]: + """Synthesize getHomeDetail payload using the cloud home state.""" + if self.home_detail_payload is not None: + return self.home_detail_payload + return { + "code": 200, + "data": { + "deviceListOrder": None, + "id": self.cloud.user.home_id, + "name": self.cloud.user.home_name, + "rrHomeId": self.cloud.user.home_id, + "tuyaHomeId": 0, + }, + "msg": "success", + } + + def mock_requests(self, mocked: aioresponses) -> None: + """Register EAPI endpoint mocks with aioresponses.""" + # getUrlByEmail Endpoint Mocking + mocked.post( + re.compile(rf"{IOT_API_BASE_URL}/getUrlByEmail.*"), + status=self.url_by_email_status, + payload=self.get_url_by_email_payload(), + ) + + # User Logins Endpoint Mocking + mocked.post( + re.compile(rf"{IOT_API_BASE_URL}/login.*"), + status=self.login_status, + payload=self.get_login_payload(), + ) + mocked.post( + re.compile(rf"{IOT_API_BASE_URL}/loginWithCode.*"), + status=self.login_status, + payload=self.get_login_payload(), + ) + + # getHomeDetail Endpoint Mocking + mocked.get( + re.compile(rf"{IOT_API_BASE_URL}/getHomeDetail.*"), + status=self.home_detail_status, + payload=self.get_home_detail_payload(), + ) + + # Dynamic homes response callback wrapper + def get_homes_callback(url, **kwargs): + if self.homes_status != 200 or self.homes_payload_override is not None: + return CallbackResult( + status=self.homes_status, + payload=self.homes_payload_override, + ) + + devices = [] + products = [] + for server in self.cloud.servers.values(): + devices.append(server.device_info) + products.append(server.product) + + home_data = HomeData( + id=self.cloud.user.home_id, + name=self.cloud.user.home_name, + devices=devices, + products=products, + ) + return CallbackResult( + status=200, + payload={ + "api": None, + "code": 200, + "result": home_data.as_dict(), + "status": "ok", + "success": True, + }, + ) + + # getHomeDetail v2 & v3 callbacks routing + mocked.get( + re.compile(rf"{REST_API_BASE_URL}/v2/user/homes/{self.cloud.user.home_id}"), + callback=get_homes_callback, + ) + mocked.get( + re.compile(rf"{REST_API_BASE_URL}/v3/user/homes/{self.cloud.user.home_id}"), + callback=get_homes_callback, + ) + + +class FakeRoborockCloud: + """A central state object representing the Roborock Cloud environment under test.""" + + def __init__(self): + self.servers: dict[str, RoborockDeviceSimulator] = {} + self.user = FakeUserState() + self.web_api = FakeWebApiClient(self) + + def add_device(self, server: RoborockDeviceSimulator) -> None: + """Register a stateful device simulator in the cloud registry.""" + self.servers[server.duid] = server + + @contextlib.contextmanager + def patch_device_manager(self): + """Context manager to patch create_v1_channel and create_mqtt_channel. + + This automatically routes communications to the registered device simulators + and intercepts HTTP calls at the network boundary using aioresponses. + """ + + # Wrapper function for create_v1_channel + def mock_create_v1_channel(user_data, mqtt_params, mqtt_session, device, device_cache): + if device.pv in ("A01", "B01"): + raise NotImplementedError( + f"Simulating protocol {device.pv} is not yet supported. " + "TODO: Implement stateful simulators for B01 (Q7/Q10) and A01 (Zeo/Dyad) devices." + ) + server = self.servers.get(device.duid) + if server is not None: + if not isinstance(server, V1VacuumSimulator): + raise TypeError( + f"Device '{device.duid}' is registered with a {type(server).__name__} " + f"simulator, but create_v1_channel requires a V1VacuumSimulator." + ) + return server.v1_channel + return original_create_v1_channel(user_data, mqtt_params, mqtt_session, device, device_cache) + + # Wrapper function for create_mqtt_channel + def mock_create_mqtt_channel(user_data, mqtt_params, mqtt_session, device): + if device.pv in ("A01", "B01"): + raise NotImplementedError( + f"Simulating protocol {device.pv} is not yet supported. " + "TODO: Implement stateful simulators for B01 (Q7/Q10) and A01 (Zeo/Dyad) devices." + ) + server = self.servers.get(device.duid) + if server: + return server.mqtt_channel + return original_create_mqtt_channel(user_data, mqtt_params, mqtt_session, device) + + # Route Web requests using the dynamic FakeWebApiClient + with aioresponses() as mocked: + self.web_api.mock_requests(mocked) + + # Patch Channel factories and rate limiters + with ( + patch( + "roborock.web_api.RoborockApiClient._login_limiter.try_acquire_async", + new=AsyncMock(return_value=True), + ), + patch("roborock.web_api.RoborockApiClient._home_data_limiter.try_acquire", return_value=True), + patch("roborock.devices.device_manager.create_v1_channel", side_effect=mock_create_v1_channel), + patch("roborock.devices.device_manager.create_mqtt_channel", side_effect=mock_create_mqtt_channel), + ): + yield diff --git a/roborock/testing/simulator.py b/roborock/testing/simulator.py new file mode 100644 index 00000000..278a5d13 --- /dev/null +++ b/roborock/testing/simulator.py @@ -0,0 +1,109 @@ +"""Base stateful device firmware simulator for python-roborock testing. + +This module defines `RoborockDeviceSimulator` which intercepts plaintext JSON RPC messages +sent over simulated channels, process them through a local state engine, update internal +variables, and write responses back to client subscribers. +""" + +import logging + +from roborock.data import HomeDataDevice, HomeDataProduct, RoborockCategory +from roborock.roborock_message import RoborockMessage +from roborock.testing.channel import FakeChannel + +_LOGGER = logging.getLogger(__name__) + +# Shared authentication key constants +DEFAULT_LOCAL_KEY = "fake_localkey_16bytes" +DEFAULT_KEY_T = "qiCNieZa" + + +class RoborockDeviceSimulator: + """Base class for stateful device firmware simulators. + + It sets up an MQTT fake transport channel (and optionally a local channel), + intercepts published requests, and routes them to `_handle_publish` to + simulate real device response. + + Not all protocols support local connections. V1 devices use both MQTT and + local channels, while A01/B01 devices use MQTT only. Subclasses that need + a local channel should set ``has_local_channel=True`` (the default for + backward compatibility with V1 simulators). + + Caller API + ---------- + Subclasses (like ``RoborockVacuumSimulator``) provide the high-level + interface (state attributes, ``trigger_push_update()``, etc.), but callers + can also reach into the underlying channels for low-level inspection: + + - **Inspect published messages**: ``simulator.mqtt_channel.published_messages`` + (and ``simulator.local_channel.published_messages`` for V1) contain every + ``RoborockMessage`` that the client sent through each transport. + - **Inject transport failures**: Set + ``simulator.mqtt_channel.publish_side_effect = RoborockException(...)`` + to make the next publish raise, simulating a network error. + - **Modify device identity**: Override ``simulator.device_info`` or + ``simulator.product`` before registering with ``FakeRoborockCloud`` to + control the device metadata returned during discovery. + """ + + def __init__( + self, + duid: str = "fake_duid", + device_info: HomeDataDevice | None = None, + product: HomeDataProduct | None = None, + has_local_channel: bool = True, + ): + self.duid = duid + self.product = product or HomeDataProduct( + id=f"product_{self.duid}", + name="Roborock Vacuum", + model="roborock.vacuum.s7", + category=RoborockCategory.VACUUM, + ) + self.device_info = device_info or HomeDataDevice( + duid=self.duid, + name=f"Vacuum {self.duid}", + local_key=DEFAULT_LOCAL_KEY, + product_id=self.product.id, + sn="fake_serial_number", + pv="1.0", + ) + + # MQTT channel is always present — all protocols use it. + self.mqtt_channel = FakeChannel(is_local=False) + self.mqtt_channel.publish.side_effect = self._handle_mqtt_publish + + # Local channel is only used by V1 devices. A01/B01 (MQTT-only) + # simulators should pass has_local_channel=False. + self.local_channel: FakeChannel | None = None + if has_local_channel: + self.local_channel = FakeChannel(is_local=True) + self.local_channel.publish.side_effect = self._handle_local_publish + + async def _handle_local_publish(self, message: RoborockMessage) -> None: + assert self.local_channel is not None + self.local_channel.published_messages.append(message) + if self.local_channel.publish_side_effect: + raise self.local_channel.publish_side_effect + await self._handle_publish(message, self.local_channel) + + async def _handle_mqtt_publish(self, message: RoborockMessage) -> None: + self.mqtt_channel.published_messages.append(message) + if self.mqtt_channel.publish_side_effect: + raise self.mqtt_channel.publish_side_effect + await self._handle_publish(message, self.mqtt_channel) + + async def _handle_publish(self, message: RoborockMessage, channel: FakeChannel) -> None: + """To be overridden by subclasses to route commands.""" + raise NotImplementedError("Subclasses must implement _handle_publish") + + def connect(self) -> None: + if self.local_channel is not None: + self.local_channel._is_connected = True + self.mqtt_channel._is_connected = True + + def close(self) -> None: + if self.local_channel is not None: + self.local_channel._is_connected = False + self.mqtt_channel._is_connected = False diff --git a/roborock/testing/v1_simulator.py b/roborock/testing/v1_simulator.py new file mode 100644 index 00000000..10bacafd --- /dev/null +++ b/roborock/testing/v1_simulator.py @@ -0,0 +1,354 @@ +"""Stateful V1/L01 vacuum device firmware simulator. + +This module provides `V1VacuumSimulator` which simulates the firmware state +machine and JSON RPC commands for V1 vacuum cleaners. +""" + +import json +import logging +import time +from collections.abc import Callable +from typing import Any +from unittest.mock import Mock + +from roborock.data import HomeDataDevice, HomeDataProduct +from roborock.data.v1 import RoborockStateCode +from roborock.devices.cache import DeviceCache, InMemoryCache +from roborock.devices.rpc.v1_channel import V1Channel +from roborock.protocols.v1_protocol import SecurityData +from roborock.roborock_message import RoborockDataProtocol, RoborockMessage, RoborockMessageProtocol +from roborock.testing.channel import FakeChannel +from roborock.testing.simulator import RoborockDeviceSimulator + +_LOGGER = logging.getLogger(__name__) + +# Simulated network details +DEFAULT_NETWORK_INFO = { + "ip": "1.1.1.1", + "ssid": "test_wifi", + "mac": "aa:bb:cc:dd:ee:ff", + "bssid": "aa:bb:cc:dd:ee:ff", + "rssi": -50, +} + +# Simulated application init parameters +DEFAULT_APP_GET_INIT_STATUS = { + "local_info": { + "name": "custom_A.03.0069_FCC", + "bom": "A.03.0069", + "location": "us", + "language": "en", + "wifiplan": "0x39", + "timezone": "US/Pacific", + "logserver": "awsusor0.fds.api.xiaomi.com", + "featureset": 1, + }, + "feature_info": [111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 122, 123, 124, 125], + "new_feature_info": 633887780925447, + "new_feature_info2": 8192, + "new_feature_info_str": "0000000000002000", + "status_info": { + "state": RoborockStateCode.charging, + "battery": 100, + "clean_time": 5610, + "clean_area": 96490000, + "error_code": 0, + "in_cleaning": 0, + "in_returning": 0, + "in_fresh_state": 1, + "lab_status": 1, + "water_box_status": 0, + "map_status": 3, + "is_locating": 0, + "lock_status": 0, + "water_box_mode": 204, + "distance_off": 0, + "water_box_carriage_status": 0, + "mop_forbidden_enable": 0, + }, +} + + +class V1VacuumSimulator(RoborockDeviceSimulator): + """Firmware simulator for a V1/L01 vacuum device. + + This class holds the simulated physical hardware state (such as battery levels, + cleaning state, fan speeds, and consumable wear). When it receives JSON RPC + commands (like `app_start` or `get_consumable`), it updates these state variables + and returns a response corresponding to the expected firmware behavior. + + Default command handlers are mapped in `self.default_handlers` and can be + overridden during initialization by passing `custom_handlers`. + """ + + def __init__( + self, + duid: str = "fake_duid", + battery: int = 100, + state: int = RoborockStateCode.charging, + fan_power: int = 102, # balanced + dnd_enabled: int = 0, + mop_mode: int = 300, + water_box_mode: int = 200, + custom_handlers: dict[str, Callable[[list[Any]], Any]] | None = None, + device_info: HomeDataDevice | None = None, + product: HomeDataProduct | None = None, + dss: int = 169, + dock_type: int = 3, + ): + super().__init__(duid=duid, device_info=device_info, product=product) + self.battery = battery + self.state = state + self.fan_power = fan_power + self.dnd_enabled = dnd_enabled + self.mop_mode = mop_mode + self.water_box_mode = water_box_mode + self.custom_handlers = custom_handlers or {} + self.dss = dss + self.dock_type = dock_type + + self.consumables = { + "main_brush_work_time": 74382, + "side_brush_work_time": 74383, + "filter_work_time": 74384, + "filter_element_work_time": 0, + "sensor_dirty_time": 74385, + "strainer_work_times": 65, + "dust_collection_work_times": 25, + "cleaning_brush_work_times": 66, + } + + self.dnd_timer = { + "start_hour": 22, + "start_minute": 0, + "end_hour": 7, + "end_minute": 0, + "enabled": 1, + } + + self.clean_summary = { + "clean_time": 74382, + "clean_area": 1159182500, + "clean_count": 31, + "dust_collection_count": 25, + "records": [1672543330, 1672458041], + } + + self.last_clean_record = { + "begin": 1672543330, + "end": 1672544638, + "duration": 1176, + "area": 20965000, + "error": 0, + "complete": 1, + "start_type": 2, + "clean_type": 3, + "finish_reason": 56, + "dust_collection_status": 1, + "avoid_count": 19, + "wash_count": 2, + "map_flag": 0, + } + + # Set up default handlers dictionary + self.default_handlers: dict[str, Callable[[Any], Any]] = { + "get_status": lambda params: [self.get_status_dict()], + "get_consumable": lambda params: [self.consumables], + "get_dnd_timer": lambda params: self.dnd_timer, + "get_clean_summary": lambda params: self.clean_summary, + "get_clean_record": lambda params: self.last_clean_record, + "app_start": self._handle_app_start, + "app_stop": self._handle_app_stop, + "app_charge": self._handle_app_charge, + "set_custom_mode": self._handle_set_custom_mode, + "set_mop_mode": self._handle_set_mop_mode, + "set_water_box_custom_mode": self._handle_set_water_box_custom_mode, + "reset_consumable": self._handle_reset_consumable, + "app_get_init_status": self._handle_app_get_init_status, + "get_network_info": self._handle_get_network_info, + } + + self.device_cache = DeviceCache(self.duid, InMemoryCache()) + self.security_data = SecurityData(endpoint="fake_endpoint", nonce=b"fake_nonce_16bytes") + local_session = Mock(return_value=self.local_channel) + + self._v1_channel = V1Channel( + device_uid=self.duid, + security_data=self.security_data, + mqtt_channel=self.mqtt_channel, # type: ignore[arg-type] + local_session=local_session, + device_cache=self.device_cache, + ) + + @property + def v1_channel(self) -> V1Channel: + """Returns the real V1Channel bound to the fake channels.""" + return self._v1_channel + + @property + def in_cleaning(self) -> int: + """Return 1 if cleaning, else 0.""" + return 1 if self.state == RoborockStateCode.cleaning else 0 + + @property + def in_returning(self) -> int: + """Return 1 if returning, else 0.""" + return 1 if self.state == RoborockStateCode.returning_home else 0 + + @property + def charge_status(self) -> int: + """Return 1 if charging, else 0.""" + return 1 if self.state == RoborockStateCode.charging else 0 + + def get_status_dict(self) -> dict[str, Any]: + """Generate status dict using the current simulated state.""" + return { + "msg_ver": 2, + "msg_seq": 458, + "state": self.state, + "battery": self.battery, + "clean_time": 1176, + "clean_area": 20965000, + "error_code": 0, + "map_present": 1, + "in_cleaning": self.in_cleaning, + "in_returning": self.in_returning, + "in_fresh_state": 1, + "lab_status": 1, + "water_box_status": 1, + "back_type": -1, + "wash_phase": 0, + "wash_ready": 0, + "fan_power": self.fan_power, + "dnd_enabled": self.dnd_enabled, + "map_status": 3, + "is_locating": 0, + "lock_status": 0, + "water_box_mode": self.water_box_mode, + "water_box_carriage_status": 1, + "mop_forbidden_enable": 1, + "camera_status": 3457, + "is_exploring": 0, + "home_sec_status": 0, + "home_sec_enable_password": 0, + "adbumper_status": [0, 0, 0], + "water_shortage_status": 0, + "grey_water_box_status": 0, + "dirty_water_box_status": 0, + "dock_type": self.dock_type, + "dust_collection_status": 0, + "auto_dust_collection": 1, + "avoid_count": 19, + "mop_mode": self.mop_mode, + "debug_mode": 0, + "collision_avoid_status": 1, + "switch_map_mode": 0, + "dock_error_status": 0, + "charge_status": self.charge_status, + "unsave_map_reason": 0, + "unsave_map_flag": 0, + "dss": self.dss, + } + + def _handle_app_start(self, params: Any) -> str: + self.state = RoborockStateCode.cleaning + return "ok" + + def _handle_app_stop(self, params: Any) -> str: + self.state = RoborockStateCode.paused + return "ok" + + def _handle_app_charge(self, params: Any) -> str: + self.state = RoborockStateCode.returning_home + return "ok" + + def _handle_set_custom_mode(self, params: Any) -> str: + if isinstance(params, list) and len(params) > 0: + self.fan_power = params[0] + elif isinstance(params, dict): + self.fan_power = params.get("fan_power", self.fan_power) + return "ok" + + def _handle_set_mop_mode(self, params: Any) -> str: + if isinstance(params, list) and len(params) > 0: + self.mop_mode = params[0] + return "ok" + + def _handle_set_water_box_custom_mode(self, params: Any) -> str: + if isinstance(params, list) and len(params) > 0: + self.water_box_mode = params[0] + return "ok" + + def _handle_reset_consumable(self, params: Any) -> str: + if isinstance(params, list) and len(params) > 0: + consumable_name = params[0] + if consumable_name in self.consumables: + self.consumables[consumable_name] = 0 + return "ok" + + def _handle_app_get_init_status(self, params: Any) -> list[dict[str, Any]]: + return [DEFAULT_APP_GET_INIT_STATUS] + + def _handle_get_network_info(self, params: Any) -> dict[str, Any]: + return DEFAULT_NETWORK_INFO + + async def _handle_publish(self, message: RoborockMessage, channel: FakeChannel) -> None: + if not message.payload: + return + + try: + payload = json.loads(message.payload.decode()) + dps = payload.get("dps", {}) + if "101" not in dps: + return + inner = json.loads(dps["101"]) + msg_id = inner["id"] + method = inner["method"] + params = inner.get("params", []) + except Exception as e: + _LOGGER.debug("Failed to parse plaintext JSON RPC payload: %s", e, exc_info=True) + return + + result = None + error = None + + # Check custom handlers override first, then fall back to default handlers + handler = self.custom_handlers.get(method) or self.default_handlers.get(method) + if handler: + try: + result = handler(params) + except Exception as e: + error = str(e) + _LOGGER.debug("Error executing command handler for %s: %s", method, e, exc_info=True) + else: + result = "ok" + + response_data = { + "dps": {"102": json.dumps({"id": msg_id, "result": result, "error": error})}, + "t": int(time.time()), + } + + response_msg = RoborockMessage( + protocol=RoborockMessageProtocol.RPC_RESPONSE, payload=json.dumps(response_data).encode(), seq=msg_id + ) + + channel.notify_subscribers(response_msg) + + def trigger_push_update(self) -> None: + """Trigger an unsolicited push state update to all subscribers.""" + dps_payload = { + str(int(RoborockDataProtocol.STATE)): self.state, + str(int(RoborockDataProtocol.BATTERY)): self.battery, + str(int(RoborockDataProtocol.FAN_POWER)): self.fan_power, + str(int(RoborockDataProtocol.WATER_BOX_MODE)): self.water_box_mode, + } + + payload = {"dps": dps_payload, "t": int(time.time())} + + push_msg = RoborockMessage( + protocol=RoborockMessageProtocol.GENERAL_RESPONSE, payload=json.dumps(payload).encode() + ) + + self.mqtt_channel.notify_subscribers(push_msg) + if self.local_channel is not None: + self.local_channel.notify_subscribers(push_msg) diff --git a/tests/fixtures/channel_fixtures.py b/tests/fixtures/channel_fixtures.py index 90ace9fa..83e5b9e6 100644 --- a/tests/fixtures/channel_fixtures.py +++ b/tests/fixtures/channel_fixtures.py @@ -1,61 +1,3 @@ -from collections.abc import Callable -from unittest.mock import AsyncMock, MagicMock +from roborock.testing import FakeChannel -from roborock.mqtt.health_manager import HealthManager -from roborock.protocols.v1_protocol import LocalProtocolVersion -from roborock.roborock_message import RoborockMessage - - -class FakeChannel: - """A fake channel that handles publish and subscribe calls.""" - - def __init__(self): - """Initialize the fake channel.""" - self.subscribers: list[Callable[[RoborockMessage], None]] = [] - self.published_messages: list[RoborockMessage] = [] - self.response_queue: list[RoborockMessage] = [] - self._is_connected = False - self.publish_side_effect: Exception | None = None - self.publish = AsyncMock(side_effect=self._publish) - self.subscribe = AsyncMock(side_effect=self._subscribe) - self.connect = AsyncMock(side_effect=self._connect) - self.close = MagicMock(side_effect=self._close) - self.protocol_version = LocalProtocolVersion.V1 - self.restart = AsyncMock() - self.health_manager = HealthManager(self.restart) - - async def _connect(self) -> None: - self._is_connected = True - - def _close(self) -> None: - self._is_connected = False - - @property - def is_connected(self) -> bool: - """Return true if connected.""" - return self._is_connected - - async def _publish(self, message: RoborockMessage) -> None: - """Simulate publishing a message and triggering a response.""" - self.published_messages.append(message) - if self.publish_side_effect: - raise self.publish_side_effect - # When a message is published, simulate a response - if self.response_queue: - response = self.response_queue.pop(0) - # Give a chance for the subscriber to be registered - for subscriber in list(self.subscribers): - subscriber(response) - - async def _subscribe(self, callback: Callable[[RoborockMessage], None]) -> Callable[[], None]: - """Simulate subscribing to messages.""" - self.subscribers.append(callback) - return lambda: self.subscribers.remove(callback) - - def notify_subscribers(self, message: RoborockMessage) -> None: - """Notify subscribers of a message. - - This can be used by tests to simulate the channel receiving a message. - """ - for subscriber in list(self.subscribers): - subscriber(message) +__all__ = ["FakeChannel"] diff --git a/tests/testing/__init__.py b/tests/testing/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/testing/test_channel.py b/tests/testing/test_channel.py new file mode 100644 index 00000000..30a0877d --- /dev/null +++ b/tests/testing/test_channel.py @@ -0,0 +1,32 @@ +from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol +from roborock.testing import FakeChannel + + +async def test_fake_channel_direct(): + """Verify raw subscription, notification, and publish capturing on FakeChannel.""" + channel = FakeChannel() + messages = [] + + def sub(msg): + messages.append(msg) + + # Trigger message before subscription + channel.notify_subscribers(RoborockMessage(protocol=RoborockMessageProtocol.RPC_RESPONSE, payload=b"pre-sub")) + assert len(messages) == 0 + + # Subscribe and notify + unsub = await channel.subscribe(sub) + channel.notify_subscribers(RoborockMessage(protocol=RoborockMessageProtocol.RPC_RESPONSE, payload=b"subbed")) + assert len(messages) == 1 + assert messages[0].payload == b"subbed" + + # Unsubscribe and notify + unsub() + channel.notify_subscribers(RoborockMessage(protocol=RoborockMessageProtocol.RPC_RESPONSE, payload=b"post-sub")) + assert len(messages) == 1 + + # Test publish logs + test_msg = RoborockMessage(protocol=RoborockMessageProtocol.RPC_RESPONSE, payload=b"sent") + await channel.publish(test_msg) + assert len(channel.published_messages) == 1 + assert channel.published_messages[0] == test_msg diff --git a/tests/testing/test_cloud.py b/tests/testing/test_cloud.py new file mode 100644 index 00000000..95fd6f64 --- /dev/null +++ b/tests/testing/test_cloud.py @@ -0,0 +1,98 @@ +import pytest + +from roborock.data import HomeDataDevice, HomeDataProduct, RoborockCategory, UserData +from roborock.data.v1 import RoborockStateCode +from roborock.devices.cache import InMemoryCache +from roborock.devices.device_manager import UserParams, create_device_manager +from roborock.exceptions import RoborockException +from roborock.testing import FakeRoborockCloud, V1VacuumSimulator +from roborock.web_api import RoborockApiClient +from tests import mock_data + +USER_DATA = UserData.from_dict(mock_data.USER_DATA) + + +async def test_fake_roborock_cloud(): + """Verify that FakeRoborockCloud can discover devices via fake HTTP requests and connect them.""" + cloud = FakeRoborockCloud() + fake_device = V1VacuumSimulator(duid="living_room_s7", battery=99, state=RoborockStateCode.charging) + cloud.add_device(fake_device) + + with cloud.patch_device_manager(): + manager = await create_device_manager( + user_params=UserParams(username="test_user", user_data=USER_DATA), + cache=InMemoryCache(), + ) + devices = await manager.get_devices() + + assert len(devices) == 1 + device = devices[0] + assert device.duid == "living_room_s7" + assert device.is_connected + + assert device.v1_properties is not None + await device.v1_properties.status.refresh() + assert device.v1_properties.status.battery == 99 + + +async def test_cloud_unsupported_protocol(): + """Verify that FakeRoborockCloud raises NotImplementedError for A01 or B01 devices.""" + cloud = FakeRoborockCloud() + fake_b01_server = V1VacuumSimulator( + duid="b01_vacuum", + product=HomeDataProduct( + id="product_b01", + name="Q7 Vacuum", + model="roborock.vacuum.sc", + category=RoborockCategory.VACUUM, + ), + device_info=HomeDataDevice( + duid="b01_vacuum", + name="Q7 Vacuum", + local_key="fake_localkey_16bytes", + product_id="product_b01", + pv="B01", + ), + ) + cloud.add_device(fake_b01_server) + + with cloud.patch_device_manager(): + with pytest.raises(NotImplementedError, match="Simulating protocol B01 is not yet supported"): + await create_device_manager( + user_params=UserParams(username="test_user", user_data=USER_DATA), + cache=InMemoryCache(), + ) + + +async def test_cloud_login_error_override(): + """Verify that we can override login status and payloads to test authentication failure handling.""" + cloud = FakeRoborockCloud() + cloud.web_api.login_status = 401 + cloud.web_api.login_payload = {"code": 1002, "msg": "Invalid credentials"} + + with cloud.patch_device_manager(): + client = RoborockApiClient(username="test_user@gmail.com") + with pytest.raises(RoborockException, match="Invalid credentials - response code: 1002"): + await client.pass_login("wrong_password") + + +async def test_cloud_dynamic_device_addition(): + """Verify that adding a device dynamically after patching works due to the callback API.""" + cloud = FakeRoborockCloud() + + with cloud.patch_device_manager(): + fake_device = V1VacuumSimulator(duid="dynamic_s7", battery=42) + cloud.add_device(fake_device) + + manager = await create_device_manager( + user_params=UserParams(username="test_user", user_data=USER_DATA), + cache=InMemoryCache(), + ) + devices = await manager.get_devices() + + assert len(devices) == 1 + assert devices[0].duid == "dynamic_s7" + + assert devices[0].v1_properties is not None + await devices[0].v1_properties.status.refresh() + assert devices[0].v1_properties.status.battery == 42 diff --git a/tests/testing/test_v1_simulator.py b/tests/testing/test_v1_simulator.py new file mode 100644 index 00000000..6cc91a4d --- /dev/null +++ b/tests/testing/test_v1_simulator.py @@ -0,0 +1,196 @@ +import pytest + +from roborock.data import UserData +from roborock.data.v1 import RoborockStateCode +from roborock.devices.cache import InMemoryCache +from roborock.devices.device_manager import UserParams, create_device_manager +from roborock.devices.traits.v1.consumeable import ConsumableAttribute +from roborock.exceptions import RoborockException +from roborock.testing import FakeRoborockCloud, V1VacuumSimulator +from tests import mock_data + +USER_DATA = UserData.from_dict(mock_data.USER_DATA) + + +async def _create_connected_device(cloud, fake_device): + """Helper to create a connected device from a cloud and simulator.""" + cloud.add_device(fake_device) + with cloud.patch_device_manager(): + manager = await create_device_manager( + user_params=UserParams(username="test_user", user_data=USER_DATA), + cache=InMemoryCache(), + ) + devices = await manager.get_devices() + assert len(devices) == 1 + return devices[0] + + +async def test_trait_consumable_refresh(): + """Verify that the consumable trait can be refreshed from the simulator.""" + cloud = FakeRoborockCloud() + fake_device = V1VacuumSimulator(duid="s7_consumable") + device = await _create_connected_device(cloud, fake_device) + + await device.v1_properties.consumables.refresh() + assert device.v1_properties.consumables.main_brush_work_time == 74382 + assert device.v1_properties.consumables.side_brush_work_time == 74383 + assert device.v1_properties.consumables.filter_work_time == 74384 + + +async def test_trait_consumable_reset(): + """Verify that resetting a consumable updates both the simulator and trait.""" + cloud = FakeRoborockCloud() + fake_device = V1VacuumSimulator(duid="s7_reset") + device = await _create_connected_device(cloud, fake_device) + + await device.v1_properties.consumables.refresh() + assert device.v1_properties.consumables.filter_work_time == 74384 + + # Reset the filter consumable through the trait API + await device.v1_properties.consumables.reset_consumable(ConsumableAttribute.FILTER_WORK_TIME) + + # The simulator state should be updated + assert fake_device.consumables["filter_work_time"] == 0 + # The trait auto-refreshes after reset, so the client should reflect the change + assert device.v1_properties.consumables.filter_work_time == 0 + + +async def test_trait_dnd_refresh(): + """Verify that the DND timer trait can be refreshed from the simulator.""" + cloud = FakeRoborockCloud() + fake_device = V1VacuumSimulator(duid="s7_dnd") + device = await _create_connected_device(cloud, fake_device) + + await device.v1_properties.dnd.refresh() + assert device.v1_properties.dnd.start_hour == 22 + assert device.v1_properties.dnd.end_hour == 7 + assert device.v1_properties.dnd.enabled == 1 + + +async def test_trait_fan_speed_change(): + """Verify that sending set_custom_mode updates the simulator fan speed and the trait reflects it.""" + cloud = FakeRoborockCloud() + fake_device = V1VacuumSimulator(duid="s7_fan", fan_power=102) + device = await _create_connected_device(cloud, fake_device) + + await device.v1_properties.status.refresh() + assert device.v1_properties.status.fan_power == 102 + + # Change fan speed through the command trait + await device.v1_properties.command.send("set_custom_mode", [105]) + assert fake_device.fan_power == 105 + + # Refresh status to pick up the changed value + await device.v1_properties.status.refresh() + assert device.v1_properties.status.fan_power == 105 + + +async def test_trait_clean_summary_refresh(): + """Verify that the clean summary trait can be refreshed from the simulator.""" + cloud = FakeRoborockCloud() + fake_device = V1VacuumSimulator(duid="s7_summary") + device = await _create_connected_device(cloud, fake_device) + + await device.v1_properties.clean_summary.refresh() + assert device.v1_properties.clean_summary.clean_count == 31 + assert device.v1_properties.clean_summary.dust_collection_count == 25 + + +async def test_trait_multiple_state_transitions(): + """Verify a sequence of state transitions through trait commands.""" + cloud = FakeRoborockCloud() + fake_device = V1VacuumSimulator(duid="s7_transitions", state=RoborockStateCode.charging) + device = await _create_connected_device(cloud, fake_device) + + # Start cleaning + await device.v1_properties.command.send("app_start") + assert fake_device.state == RoborockStateCode.cleaning + + # Stop (pauses the vacuum) + await device.v1_properties.command.send("app_stop") + assert fake_device.state == RoborockStateCode.paused + + # Send it back to the dock + await device.v1_properties.command.send("app_charge") + assert fake_device.state == RoborockStateCode.returning_home + + # Verify the client sees the final state after refresh + await device.v1_properties.status.refresh() + assert device.v1_properties.status.state == RoborockStateCode.returning_home + + +async def test_trait_push_update_propagation(): + """Verify that unsolicited push updates propagate to client traits without refresh.""" + cloud = FakeRoborockCloud() + fake_device = V1VacuumSimulator(duid="s7_push", battery=99, state=RoborockStateCode.charging) + device = await _create_connected_device(cloud, fake_device) + + await device.v1_properties.status.refresh() + assert device.v1_properties.status.battery == 99 + + # Mutate the simulator state and push an update + fake_device.battery = 45 + fake_device.state = RoborockStateCode.returning_home + fake_device.trigger_push_update() + + # The client status properties should be updated immediately without a manual refresh + assert device.v1_properties.status.battery == 45 + assert device.v1_properties.status.state == RoborockStateCode.returning_home + + +async def test_trait_custom_handler_override(): + """Verify that custom_handlers override default behavior for specific commands.""" + + def custom_get_status(params): + return [{"state": RoborockStateCode.cleaning, "battery": 77, "fan_power": 999}] + + cloud = FakeRoborockCloud() + fake_device = V1VacuumSimulator( + duid="s7_custom", + custom_handlers={"get_status": custom_get_status}, + ) + device = await _create_connected_device(cloud, fake_device) + + # The custom handler returns different values than the simulator's defaults + await device.v1_properties.status.refresh() + assert device.v1_properties.status.battery == 77 + assert device.v1_properties.status.fan_power == 999 + + +async def test_trait_properties_and_dss_config(): + """Verify that properties, dss config, and dock_type config are correctly exposed on the simulator.""" + fake_device = V1VacuumSimulator(duid="s7_properties", state=RoborockStateCode.cleaning, dss=42, dock_type=5) + assert fake_device.in_cleaning == 1 + assert fake_device.in_returning == 0 + assert fake_device.charge_status == 0 + assert fake_device.dss == 42 + assert fake_device.dock_type == 5 + + fake_device.state = RoborockStateCode.returning_home + assert fake_device.in_cleaning == 0 + assert fake_device.in_returning == 1 + assert fake_device.charge_status == 0 + + fake_device.state = RoborockStateCode.charging + assert fake_device.in_cleaning == 0 + assert fake_device.in_returning == 0 + assert fake_device.charge_status == 1 + + +async def test_trait_publish_failure_injection(): + """Verify that publish_side_effect on simulator channels correctly raises errors.""" + cloud = FakeRoborockCloud() + fake_device = V1VacuumSimulator(duid="s7_failing_publish") + device = await _create_connected_device(cloud, fake_device) + + # Make local publish fail + assert fake_device.local_channel is not None + fake_device.local_channel.publish_side_effect = RoborockException("Local network error") + + # The client status refresh should still succeed by falling back to MQTT! + await device.v1_properties.status.refresh() + + # If MQTT also fails, the refresh must fail + fake_device.mqtt_channel.publish_side_effect = RoborockException("MQTT network error") + with pytest.raises(RoborockException, match="MQTT network error"): + await device.v1_properties.status.refresh()