From 5a69e9c62dfdec4f57f59a26d44217bbddd58d76 Mon Sep 17 00:00:00 2001 From: Thomas Coratger <60488569+tcoratger@users.noreply.github.com> Date: Fri, 12 Jun 2026 15:48:40 +0200 Subject: [PATCH] fix(gossipsub): bound per-RPC frame length before buffering The incoming-RPC framing loop accepted any varint-declared frame length and waited across reads until that many bytes arrived, with no upper bound. A peer could declare a huge frame and force the buffer to grow without ever decoding, exhausting memory. Reject any frame whose declared length exceeds the existing payload cap before waiting for the bytes, matching the reqresp codec guard. The oversized frame disconnects the peer cleanly via the receive loop's existing error handling. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../node/networking/gossipsub/behavior.py | 8 ++++ .../networking/gossipsub/test_behavior.py | 48 ++++++++++++++++++- 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/src/lean_spec/node/networking/gossipsub/behavior.py b/src/lean_spec/node/networking/gossipsub/behavior.py index 05a1c37a7..0a56fe2e6 100644 --- a/src/lean_spec/node/networking/gossipsub/behavior.py +++ b/src/lean_spec/node/networking/gossipsub/behavior.py @@ -65,6 +65,7 @@ from typing import ClassVar, Final, cast from lean_spec.node.networking.config import ( + MAX_PAYLOAD_SIZE, MESSAGE_DOMAIN_INVALID_SNAPPY, MESSAGE_DOMAIN_VALID_SNAPPY, PRUNE_BACKOFF, @@ -951,6 +952,13 @@ async def _receive_loop(self, peer_id: PeerId, stream: QuicStreamAdapter) -> Non # Incomplete varint -- wait for more data. break + # A declared length is attacker-controlled. + # Without an upper bound, a peer can claim a huge frame + # and force us to buffer reads forever without ever decoding. + # Reject and disconnect before waiting for any of those bytes. + if length > MAX_PAYLOAD_SIZE: + raise ValueError(f"RPC frame too large: {length} > {MAX_PAYLOAD_SIZE}") + # Not enough bytes yet -- wait for the next read. if len(buffer) < varint_size + length: break diff --git a/tests/node/networking/gossipsub/test_behavior.py b/tests/node/networking/gossipsub/test_behavior.py index efb0fae59..173e93ba3 100644 --- a/tests/node/networking/gossipsub/test_behavior.py +++ b/tests/node/networking/gossipsub/test_behavior.py @@ -7,11 +7,12 @@ from __future__ import annotations +import logging import time import pytest -from lean_spec.node.networking.config import PRUNE_BACKOFF +from lean_spec.node.networking.config import MAX_PAYLOAD_SIZE, PRUNE_BACKOFF from lean_spec.node.networking.gossipsub.behavior import ( IDONTWANT_SIZE_THRESHOLD, GossipsubMessageEvent, @@ -31,6 +32,7 @@ SubOpts, ) from lean_spec.node.networking.gossipsub.types import MessageId, Timestamp, TopicId +from lean_spec.node.networking.varint import encode_varint from tests.node.networking.gossipsub.conftest import add_peer, make_behavior, make_peer @@ -1097,3 +1099,47 @@ async def test_unsubscribe_prunes_mesh_peers(self) -> None: assert sub_sends == [(p1, sub_rpc), (p2, sub_rpc)] assert {peer_id for peer_id, _ in prune_sends} == {p1, p2} assert all(rpc == prune_rpc for _, rpc in prune_sends) + + +class TestReceiveLoop: + """Tests for the incoming-RPC framing loop.""" + + @pytest.mark.asyncio + async def test_oversized_declared_frame_disconnects_without_buffering( + self, caplog: pytest.LogCaptureFixture + ) -> None: + """An over-limit declared frame length disconnects the peer after one read.""" + behavior, _ = make_behavior() + peer_id = add_peer(behavior, "peerA") + + # A peer claims a frame one byte larger than the payload cap, + # but sends none of the promised bytes. + # A bounded loop must reject on the length alone, not wait for the bytes. + oversized_frame_prefix = encode_varint(MAX_PAYLOAD_SIZE + 1) + + read_count = 0 + handled_rpcs: list[RPC] = [] + + async def fake_read(n: int | None = None) -> bytes: + nonlocal read_count + read_count += 1 + # Serve the oversized length prefix once, then empty on any later read. + return oversized_frame_prefix if read_count == 1 else b"" + + async def record_handled_rpc(_peer_id: object, rpc: RPC) -> None: + handled_rpcs.append(rpc) + + behavior._handle_rpc = record_handled_rpc # type: ignore[assignment] + + fake_stream = type("FakeStream", (), {"read": staticmethod(fake_read)})() + with caplog.at_level(logging.WARNING): + await behavior._receive_loop(peer_id, fake_stream) + + # Rejected on the declared length alone: a single read, no decode, peer gone. + assert read_count == 1 + assert handled_rpcs == [] + assert peer_id not in behavior._peers + assert caplog.messages == [ + f"Error receiving from {peer_id}: " + f"RPC frame too large: {MAX_PAYLOAD_SIZE + 1} > {MAX_PAYLOAD_SIZE}" + ]