diff --git a/src/lean_spec/node/networking/gossipsub/behavior.py b/src/lean_spec/node/networking/gossipsub/behavior.py index 7546ab0b1..56fc1e335 100644 --- a/src/lean_spec/node/networking/gossipsub/behavior.py +++ b/src/lean_spec/node/networking/gossipsub/behavior.py @@ -1010,6 +1010,13 @@ async def _receive_loop(self, peer_id: PeerId, stream: QuicStreamAdapter) -> Non # Incomplete varint -- wait for more data. break + # A declared length is attacker-controlled. + # Without an upper bound, a peer can claim a huge frame + # and force us to buffer reads forever without ever decoding. + # Reject and disconnect before waiting for any of those bytes. + if length > MAX_PAYLOAD_SIZE: + raise ValueError(f"RPC frame too large: {length} > {MAX_PAYLOAD_SIZE}") + # Not enough bytes yet -- wait for the next read. if len(buffer) < varint_size + length: break diff --git a/tests/node/networking/gossipsub/test_behavior.py b/tests/node/networking/gossipsub/test_behavior.py index 0034b05d6..f45ab32a6 100644 --- a/tests/node/networking/gossipsub/test_behavior.py +++ b/tests/node/networking/gossipsub/test_behavior.py @@ -7,6 +7,7 @@ from __future__ import annotations +import logging import time import pytest @@ -33,6 +34,7 @@ SubOpts, ) from lean_spec.node.networking.gossipsub.types import MessageId, Timestamp, TopicId +from lean_spec.node.networking.varint import encode_varint from lean_spec.node.snappy import compress as snappy_compress from tests.node.networking.gossipsub.conftest import add_peer, make_behavior, make_peer @@ -1178,3 +1180,47 @@ async def test_unsubscribe_prunes_mesh_peers(self) -> None: assert sub_sends == [(p1, sub_rpc), (p2, sub_rpc)] assert {peer_id for peer_id, _ in prune_sends} == {p1, p2} assert all(rpc == prune_rpc for _, rpc in prune_sends) + + +class TestReceiveLoop: + """Tests for the incoming-RPC framing loop.""" + + @pytest.mark.asyncio + async def test_oversized_declared_frame_disconnects_without_buffering( + self, caplog: pytest.LogCaptureFixture + ) -> None: + """An over-limit declared frame length disconnects the peer after one read.""" + behavior, _ = make_behavior() + peer_id = add_peer(behavior, "peerA") + + # A peer claims a frame one byte larger than the payload cap, + # but sends none of the promised bytes. + # A bounded loop must reject on the length alone, not wait for the bytes. + oversized_frame_prefix = encode_varint(MAX_PAYLOAD_SIZE + 1) + + read_count = 0 + handled_rpcs: list[RPC] = [] + + async def fake_read(n: int | None = None) -> bytes: + nonlocal read_count + read_count += 1 + # Serve the oversized length prefix once, then empty on any later read. + return oversized_frame_prefix if read_count == 1 else b"" + + async def record_handled_rpc(_peer_id: object, rpc: RPC) -> None: + handled_rpcs.append(rpc) + + behavior._handle_rpc = record_handled_rpc # type: ignore[assignment] + + fake_stream = type("FakeStream", (), {"read": staticmethod(fake_read)})() + with caplog.at_level(logging.WARNING): + await behavior._receive_loop(peer_id, fake_stream) + + # Rejected on the declared length alone: a single read, no decode, peer gone. + assert read_count == 1 + assert handled_rpcs == [] + assert peer_id not in behavior._peers + assert caplog.messages == [ + f"Error receiving from {peer_id}: " + f"RPC frame too large: {MAX_PAYLOAD_SIZE + 1} > {MAX_PAYLOAD_SIZE}" + ]