|
| 1 | +"""Fake channel transport implementation for python-roborock. |
| 2 | +
|
| 3 | +This module defines `FakeChannel`, which simulates low-level connection, |
| 4 | +subscription, and publishing logic at the message boundary. It acts as an |
| 5 | +in-memory replacement for `MqttChannel` and `LocalChannel` during testing. |
| 6 | +""" |
| 7 | + |
| 8 | +from collections.abc import Callable |
| 9 | +from unittest.mock import AsyncMock, MagicMock |
| 10 | + |
| 11 | +from roborock.devices.transport.channel import Channel |
| 12 | +from roborock.mqtt.health_manager import HealthManager |
| 13 | +from roborock.protocols.v1_protocol import LocalProtocolVersion |
| 14 | +from roborock.roborock_message import RoborockMessage |
| 15 | + |
| 16 | + |
| 17 | +class FakeChannel(Channel): |
| 18 | + """A stateful, in-memory transport simulator implementing the Channel protocol. |
| 19 | +
|
| 20 | + It captures all published messages in `published_messages`, maintains a registry |
| 21 | + of active callbacks in `subscribers`, and enables tests or stateful simulators to |
| 22 | + unconditionally push unsolicited messages using `notify_subscribers`. |
| 23 | +
|
| 24 | + Caller API |
| 25 | + ---------- |
| 26 | + The public interface consists of `AsyncMock` / `MagicMock` attributes that |
| 27 | + wrap internal implementations. Because they are mocks, callers can: |
| 28 | +
|
| 29 | + - **Inspect calls**: ``channel.publish.assert_called_once()`` |
| 30 | + - **Inject failures**: ``channel.publish.side_effect = RoborockException(...)`` |
| 31 | + to simulate transport errors on the next publish. |
| 32 | + - **Replace behavior**: ``channel.connect.side_effect = my_custom_connect`` |
| 33 | + to substitute entirely custom logic. |
| 34 | + - **Queue canned responses**: Append to ``channel.response_queue`` to have |
| 35 | + the channel automatically deliver a response to subscribers on the next |
| 36 | + publish (useful for low-level RPC request/response testing). |
| 37 | + - **Push unsolicited messages**: Call ``channel.notify_subscribers(msg)`` |
| 38 | + to simulate the device broadcasting a state change. |
| 39 | + """ |
| 40 | + |
| 41 | + def __init__(self, is_local: bool = False): |
| 42 | + """Initialize the fake channel.""" |
| 43 | + self.subscribers: list[Callable[[RoborockMessage], None]] = [] |
| 44 | + self.published_messages: list[RoborockMessage] = [] |
| 45 | + self.response_queue: list[RoborockMessage] = [] |
| 46 | + self._is_connected = False |
| 47 | + self._is_local = is_local |
| 48 | + |
| 49 | + # Set this to an exception instance to make the next publish raise it. |
| 50 | + # This is a convenience shortcut; callers can also replace |
| 51 | + # ``publish.side_effect`` directly for more control. |
| 52 | + self.publish_side_effect: Exception | None = None |
| 53 | + |
| 54 | + # AsyncMock wrapping _publish. Callers can replace side_effect to |
| 55 | + # inject transport errors, e.g.: |
| 56 | + # channel.publish.side_effect = RoborockException("timeout") |
| 57 | + self.publish = AsyncMock(side_effect=self._publish) |
| 58 | + |
| 59 | + # AsyncMock wrapping _connect. Callers can replace side_effect to |
| 60 | + # simulate connection failures, e.g.: |
| 61 | + # channel.connect.side_effect = RoborockException("refused") |
| 62 | + self.connect = AsyncMock(side_effect=self._connect) |
| 63 | + |
| 64 | + # MagicMock wrapping _close. Callers can assert close was called |
| 65 | + # or inject errors on teardown. |
| 66 | + self.close = MagicMock(side_effect=self._close) |
| 67 | + |
| 68 | + self.protocol_version = LocalProtocolVersion.V1 |
| 69 | + self.restart = AsyncMock() |
| 70 | + self.health_manager = HealthManager(self.restart) |
| 71 | + |
| 72 | + async def _connect(self) -> None: |
| 73 | + self._is_connected = True |
| 74 | + |
| 75 | + def _close(self) -> None: |
| 76 | + self._is_connected = False |
| 77 | + |
| 78 | + @property |
| 79 | + def is_connected(self) -> bool: |
| 80 | + """Return true if connected.""" |
| 81 | + return self._is_connected |
| 82 | + |
| 83 | + @property |
| 84 | + def is_local_connected(self) -> bool: |
| 85 | + """Return true if locally connected.""" |
| 86 | + return self._is_connected and self._is_local |
| 87 | + |
| 88 | + async def _publish(self, message: RoborockMessage) -> None: |
| 89 | + """Default publish implementation. |
| 90 | +
|
| 91 | + Records the message in ``published_messages`` and, if |
| 92 | + ``response_queue`` is non-empty, pops the first response and |
| 93 | + delivers it to all current subscribers (simulating a |
| 94 | + request/response round-trip). |
| 95 | + """ |
| 96 | + self.published_messages.append(message) |
| 97 | + if self.publish_side_effect: |
| 98 | + raise self.publish_side_effect |
| 99 | + if self.response_queue: |
| 100 | + response = self.response_queue.pop(0) |
| 101 | + self.notify_subscribers(response) |
| 102 | + |
| 103 | + async def subscribe(self, callback: Callable[[RoborockMessage], None]) -> Callable[[], None]: |
| 104 | + """Register a callback and return an unsubscribe function. |
| 105 | +
|
| 106 | + This is a real method implementing the Channel protocol, not a mock. |
| 107 | + """ |
| 108 | + self.subscribers.append(callback) |
| 109 | + return lambda: self.subscribers.remove(callback) |
| 110 | + |
| 111 | + def notify_subscribers(self, message: RoborockMessage) -> None: |
| 112 | + """Deliver a message to all current subscribers. |
| 113 | +
|
| 114 | + Use this to simulate the channel receiving an unsolicited message |
| 115 | + from the device (e.g. a state change broadcast). |
| 116 | + """ |
| 117 | + for subscriber in list(self.subscribers): |
| 118 | + subscriber(message) |
0 commit comments