Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/lean_spec/node/networking/gossipsub/behavior.py
Original file line number Diff line number Diff line change
Expand Up @@ -1010,6 +1010,13 @@ async def _receive_loop(self, peer_id: PeerId, stream: QuicStreamAdapter) -> Non
# Incomplete varint -- wait for more data.
break

# A declared length is attacker-controlled.
# Without an upper bound, a peer can claim a huge frame
# and force us to buffer reads forever without ever decoding.
# Reject and disconnect before waiting for any of those bytes.
if length > MAX_PAYLOAD_SIZE:
raise ValueError(f"RPC frame too large: {length} > {MAX_PAYLOAD_SIZE}")

# Not enough bytes yet -- wait for the next read.
if len(buffer) < varint_size + length:
break
Expand Down
46 changes: 46 additions & 0 deletions tests/node/networking/gossipsub/test_behavior.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from __future__ import annotations

import logging
import time

import pytest
Expand All @@ -33,6 +34,7 @@
SubOpts,
)
from lean_spec.node.networking.gossipsub.types import MessageId, Timestamp, TopicId
from lean_spec.node.networking.varint import encode_varint
from lean_spec.node.snappy import compress as snappy_compress
from tests.node.networking.gossipsub.conftest import add_peer, make_behavior, make_peer

Expand Down Expand Up @@ -1178,3 +1180,47 @@ async def test_unsubscribe_prunes_mesh_peers(self) -> None:
assert sub_sends == [(p1, sub_rpc), (p2, sub_rpc)]
assert {peer_id for peer_id, _ in prune_sends} == {p1, p2}
assert all(rpc == prune_rpc for _, rpc in prune_sends)


class TestReceiveLoop:
"""Tests for the incoming-RPC framing loop."""

@pytest.mark.asyncio
async def test_oversized_declared_frame_disconnects_without_buffering(
self, caplog: pytest.LogCaptureFixture
) -> None:
"""An over-limit declared frame length disconnects the peer after one read."""
behavior, _ = make_behavior()
peer_id = add_peer(behavior, "peerA")

# A peer claims a frame one byte larger than the payload cap,
# but sends none of the promised bytes.
# A bounded loop must reject on the length alone, not wait for the bytes.
oversized_frame_prefix = encode_varint(MAX_PAYLOAD_SIZE + 1)

read_count = 0
handled_rpcs: list[RPC] = []

async def fake_read(n: int | None = None) -> bytes:
nonlocal read_count
read_count += 1
# Serve the oversized length prefix once, then empty on any later read.
return oversized_frame_prefix if read_count == 1 else b""

async def record_handled_rpc(_peer_id: object, rpc: RPC) -> None:
handled_rpcs.append(rpc)

behavior._handle_rpc = record_handled_rpc # type: ignore[assignment]

fake_stream = type("FakeStream", (), {"read": staticmethod(fake_read)})()
with caplog.at_level(logging.WARNING):
await behavior._receive_loop(peer_id, fake_stream)

# Rejected on the declared length alone: a single read, no decode, peer gone.
assert read_count == 1
assert handled_rpcs == []
assert peer_id not in behavior._peers
assert caplog.messages == [
f"Error receiving from {peer_id}: "
f"RPC frame too large: {MAX_PAYLOAD_SIZE + 1} > {MAX_PAYLOAD_SIZE}"
]
Loading