Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 21 additions & 21 deletions s3proxy/concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

import structlog

from s3proxy.crypto import memory_bounded_part_size
from s3proxy.crypto import streaming_upload_peak
from s3proxy.errors import S3Error
from s3proxy.metrics import MEMORY_LIMIT_BYTES, MEMORY_REJECTIONS, MEMORY_RESERVED_BYTES

Expand Down Expand Up @@ -87,19 +87,21 @@ async def try_acquire(self, bytes_needed: int) -> int:

to_reserve = max(MIN_RESERVATION, bytes_needed)

# Single request exceeds entire budget — can never fit, reject immediately
# A single request's honest peak can exceed the whole budget: the framed
# upload path needs ~2-3x the internal part, more than a deliberately tight
# governor budget (which sits well below pod RAM to bound *concurrency*).
# Clamp to the budget so such a request runs exclusively (concurrency 1)
# rather than being refused outright -- the proxy streams it fine, it just
# can't share the budget. ponytail: a request whose true peak exceeds the
# pod's RAM (not just this budget) could still OOM when run alone; all
# realistic parts (<=~56MB) stay far under that.
if to_reserve > self._limit_bytes:
request_mb = to_reserve / 1024 / 1024
limit_mb = self._limit_bytes / 1024 / 1024
logger.warning(
"MEMORY_TOO_LARGE",
requested_mb=round(request_mb, 2),
limit_mb=round(limit_mb, 2),
)
MEMORY_REJECTIONS.inc()
raise S3Error.slow_down(
f"Request needs {request_mb:.0f}MB but budget is {limit_mb:.0f}MB"
logger.info(
"MEMORY_CLAMPED_TO_BUDGET",
requested_mb=round(to_reserve / 1024 / 1024, 2),
limit_mb=round(self._limit_bytes / 1024 / 1024, 2),
)
to_reserve = self._limit_bytes

async with self._condition:
deadline = asyncio.get_event_loop().time() + BACKPRESSURE_TIMEOUT
Expand Down Expand Up @@ -164,22 +166,20 @@ async def release(self, bytes_reserved: int) -> None:
def estimate_memory_footprint(method: str, content_length: int) -> int:
"""Estimate memory needed for a request.

Small PUTs buffer the whole body + ciphertext. Larger PUTs stream and buffer
one internal part at a time, so reserve exactly that internal part size --
the same value the upload path uses (memory_bounded_part_size). This keeps
the reservation honest: the limiter then admits only as many concurrent
uploads as actually fit the budget, instead of under-counting and OOMing.
GETs reserve a baseline here; encrypted GETs acquire additional memory in the handler.
PUTs stream and encrypt one internal part at a time; reserve the framed
path's true peak (streaming_upload_peak), which stacks the accumulated
ciphertext, the encrypt transient, the held frame and the HTTP body copy --
not just the part size. Reserving the bare part size under-counted ~3x and
let the limiter admit too many concurrent uploads -> OOM. GETs reserve a
baseline; encrypted GETs acquire more in the handler.
"""
if method in ("HEAD", "DELETE"):
return 0
if method == "GET":
return MAX_BUFFER_SIZE
if method == "POST":
return MIN_RESERVATION
if content_length <= MAX_BUFFER_SIZE:
return max(MIN_RESERVATION, content_length * 2)
return memory_bounded_part_size(content_length)
return max(MIN_RESERVATION, streaming_upload_peak(content_length))


# Module-level convenience functions delegating to the default instance
Expand Down
22 changes: 22 additions & 0 deletions s3proxy/crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,28 @@ def memory_bounded_part_size(
return -(-content_length // parts) # ceil: even split, never exceeds `parts`


def streaming_upload_peak(content_length: int) -> int:
"""Peak memory the framed UploadPart path holds for one in-flight request.

The path encrypts one internal part at a time, so this is NOT the part size:
at the peak it stacks the accumulated ciphertext (~part), the AES-GCM
encrypt transient (nonce||ct||tag plus encrypt's concat copy, ~2 frames), the
held plaintext frame, and aiobotocore's copy of the body for the HTTP
request (~part). ``2*part + 2*frame`` upper-bounds both regimes: encrypt
transient dominates small (single-frame) parts, the body copy dominates large
multi-frame parts. Measured peaks (tracemalloc): 16MB part -> 24.5MB (bound
32MB), 512MB client part / 25.6MB internal -> 56.1MB (bound 67MB). Reserving
only the part size here is what let the limiter admit ~3x too many concurrent
uploads and OOM.
"""
# ponytail: bound, not exact. Ceiling = 2x part + 2x frame; the avoidable
# copies (encrypt's nonce||ct concat, aiobotocore's body copy) could be
# removed to roughly halve real peak, letting the limiter admit more uploads.
part = memory_bounded_part_size(content_length)
frame = min(part, FRAME_PLAINTEXT_SIZE)
return 2 * part + 2 * frame


@dataclass(slots=True)
class EncryptedData:
"""Container for encrypted data and metadata."""
Expand Down
22 changes: 22 additions & 0 deletions tests/unit/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""Unit-test fixtures."""

import pytest

import s3proxy.concurrency as concurrency


@pytest.fixture(autouse=True)
def no_backpressure_wait():
"""Reject immediately instead of waiting out the backpressure timeout.

The rejection tests fill the budget to capacity and then try to acquire more,
which can never succeed -- with the production timeout each such test sleeps
the full BACKPRESSURE_TIMEOUT before asserting SlowDown. None of the in-process
unit tests exercise wait-then-succeed backpressure, so a 0 timeout gives the
same SlowDown with no wall-clock wait. (Patched at runtime, not via env, so it
doesn't depend on import order and never leaks past a test.)
"""
original = concurrency.BACKPRESSURE_TIMEOUT
concurrency.BACKPRESSURE_TIMEOUT = 0
yield
concurrency.BACKPRESSURE_TIMEOUT = original
31 changes: 17 additions & 14 deletions tests/unit/test_concurrency_limit.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,48 +323,51 @@ async def test_release_memory_never_negative(self):
assert concurrency_module.get_active_memory() == 0

def test_estimate_memory_footprint_put_small(self):
"""PUT with small file should use content_length * 2."""
"""Small PUT reserves the framed path's peak (streaming_upload_peak)."""
import s3proxy.concurrency as concurrency_module
from s3proxy import crypto

# 1MB file → 2MB footprint
footprint = concurrency_module.estimate_memory_footprint("PUT", 1 * 1024 * 1024)
assert footprint == 2 * 1024 * 1024
assert footprint == crypto.streaming_upload_peak(1 * 1024 * 1024)

def test_estimate_memory_footprint_put_large(self):
"""Large PUT reserves the real internal-part buffer the upload holds."""
"""Large PUT reserves the framed path's real peak, above the part size."""
import s3proxy.concurrency as concurrency_module
from s3proxy import crypto

for mb in (50, 100, 512, 1024):
cl = mb * 1024 * 1024
footprint = concurrency_module.estimate_memory_footprint("PUT", cl)
assert footprint == crypto.memory_bounded_part_size(cl)
assert footprint == crypto.streaming_upload_peak(cl)
assert footprint > crypto.memory_bounded_part_size(cl)

def test_large_uploads_bounded_below_pod_memory(self):
"""Regression for the barman OOM. Two linked invariants:
"""Regression for the barman/ES OOM. Linked invariants:
1. an internal part never expands beyond the per-client allocation range
(or part numbers collide) -- for ANY client part size, and
2. the reservation tracks the real buffer, so admitted x footprint never
exceeds the budget (limiter guarantee), and barman-scale parts admit
only ~2 concurrent (the old flat-16MB estimate admitted ~4 -> OOM).
(or part numbers collide) -- for ANY client part size,
2. the reservation is the framed path's real peak (> part size), so the
limiter can't under-count and admit too many uploads, and
3. total admitted memory never exceeds the budget (limiter guarantee).
"""
import s3proxy.concurrency as concurrency_module
from s3proxy import crypto
from s3proxy.state import MAX_INTERNAL_PARTS_PER_CLIENT

budget = concurrency_module.get_memory_limit()
budget = concurrency_module.get_memory_limit() # deployed 64MB
for mb in (50, 128, 320, 512, 1024, 4096):
cl = mb * 1024 * 1024
part = crypto.memory_bounded_part_size(cl)
internal_parts = -(-cl // part)
assert internal_parts <= MAX_INTERNAL_PARTS_PER_CLIENT, "would collide part numbers"
footprint = concurrency_module.estimate_memory_footprint("PUT", cl)
assert footprint > part, "reservation must exceed the bare part size"
# limiter guarantee: total admitted memory never exceeds the budget
assert (budget // footprint) * footprint <= budget

# barman-scale parts: bounded to ~2 concurrent on the default 64MB budget
footprint_512 = concurrency_module.estimate_memory_footprint("PUT", 512 * 1024 * 1024)
assert budget // footprint_512 <= 2
# The real workload is 16MB ES parts: honest reservation fits the 64MB
# budget with room for concurrency (the under-count admitted ~8 -> OOM).
es = concurrency_module.estimate_memory_footprint("PUT", 16 * 1024 * 1024)
assert es <= budget // 2

def test_estimate_memory_footprint_get(self):
"""GET should always use fixed buffer size."""
Expand Down
120 changes: 60 additions & 60 deletions tests/unit/test_memory_concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
at 733 bytes) should not be treated the same as large uploads (100MB+).

Memory estimation logic:
- PUT ≤8MB: content_length * 2 (body + ciphertext buffer)
- PUT >8MB: memory_bounded_part_size(content_length) (the one internal part the
streaming/framed upload path actually buffers at a time)
- PUT: streaming_upload_peak(content_length) -- the framed upload path's true
peak (accumulated ciphertext + encrypt transient + held frame + HTTP body
copy), NOT the bare internal-part size. Reserving the part size under-counted
~3x and let the limiter admit too many concurrent uploads -> OOM.
- GET: MAX_BUFFER_SIZE (8MB baseline, handler acquires more for encrypted decrypts)
- POST: MIN_RESERVATION (64KB, metadata only)
- HEAD/DELETE: 0 (no buffering, bypass limit)
Expand Down Expand Up @@ -35,29 +36,32 @@ def reset_state(self):
yield
concurrency_module.reset_state()

def test_small_file_uses_content_length_x2(self):
"""PUT with 1KB file should reserve 2KB (content_length * 2)."""
def test_small_file_reserves_streaming_peak(self):
"""Tiny PUTs floor at MIN_RESERVATION; small ones reserve the framed peak."""
import s3proxy.concurrency as concurrency_module
from s3proxy import crypto

footprint = concurrency_module.estimate_memory_footprint("PUT", 1024)
# 1KB * 2 = 2KB, but minimum is 64KB
assert footprint == concurrency_module.MIN_RESERVATION

# With 100KB file: 100KB * 2 = 200KB
# 1KB peak is tiny -> floored at MIN_RESERVATION (64KB)
assert concurrency_module.estimate_memory_footprint("PUT", 1024) == (
concurrency_module.MIN_RESERVATION
)
# 100KB part stays single-frame: peak = 2*part + 2*frame = 4*100KB
footprint = concurrency_module.estimate_memory_footprint("PUT", 100 * 1024)
assert footprint == 200 * 1024
assert footprint == crypto.streaming_upload_peak(100 * 1024) == 400 * 1024

def test_large_file_reserves_real_internal_part(self):
"""Large PUTs must reserve the actual internal-part buffer the upload
path holds (memory_bounded_part_size), not a flat guess -- otherwise the
limiter under-counts and admits too many concurrent uploads (the OOM)."""
def test_large_file_reserves_framed_peak_not_part_size(self):
"""Large PUTs must reserve the framed path's true peak, NOT the bare
internal-part size -- reserving the part size under-counted ~3x and let
the limiter admit too many concurrent uploads (the OOM). The peak must
strictly exceed the part size."""
import s3proxy.concurrency as concurrency_module
from s3proxy import crypto

for mb in (50, 100, 512, 1024):
cl = mb * 1024 * 1024
footprint = concurrency_module.estimate_memory_footprint("PUT", cl)
assert footprint == crypto.memory_bounded_part_size(cl)
assert footprint == crypto.streaming_upload_peak(cl)
assert footprint > crypto.memory_bounded_part_size(cl)

def test_minimum_reservation_enforced(self):
"""0-byte file should still reserve MIN_RESERVATION (64KB)."""
Expand Down Expand Up @@ -151,16 +155,20 @@ async def test_memory_released_on_completion(self):
assert concurrency_module.get_active_memory() == 0

@pytest.mark.asyncio
async def test_single_request_cannot_exceed_budget(self):
"""A single 100MB request should be rejected when it exceeds the 64MB budget."""
async def test_single_request_larger_than_budget_runs_exclusively(self):
"""A request whose footprint exceeds the whole budget is NOT rejected -- it
is clamped to the budget and runs exclusively (concurrency 1). The proxy
streams it fine; the budget bounds concurrency, not one request's peak.
(Rejecting it would refuse large uploads under a tight budget -- the
integration OOM test uploads 30MB at a 16MB budget and must succeed.)
"""
import s3proxy.concurrency as concurrency_module
from s3proxy.errors import S3Error

# Request 100MB — exceeds 64MB limit, should be rejected immediately
with pytest.raises(S3Error, match="503"):
await concurrency_module.try_acquire_memory(100 * 1024 * 1024)

# No memory should be reserved after rejection
limit = concurrency_module.get_memory_limit()
reserved = await concurrency_module.try_acquire_memory(100 * 1024 * 1024)
assert reserved == limit # clamped to the whole budget
assert concurrency_module.get_active_memory() == limit
await concurrency_module.release_memory(reserved)
assert concurrency_module.get_active_memory() == 0

@pytest.mark.asyncio
Expand Down Expand Up @@ -263,50 +271,42 @@ async def test_elasticsearch_shard_backup_scenario(self):

@pytest.mark.asyncio
async def test_mixed_workload_scenario(self):
"""Simulate mixed workload: small files + streaming uploads."""
"""Mixed workload at the deployed 64MB budget: the real 16MB ES snapshot
uploads (framed) plus small metadata files, then full."""
import s3proxy.concurrency as concurrency_module
from s3proxy.errors import S3Error
from s3proxy import crypto

concurrency_module.set_memory_limit(64) # deployed governor budget
limit = 64 * 1024 * 1024
reservations = []

# 2 large streaming uploads (320MB -> 16MB internal part each = 32MB)
for _ in range(2):
footprint = concurrency_module.estimate_memory_footprint("PUT", 320 * 1024 * 1024)
assert footprint == 16 * 1024 * 1024 # one 16MB internal part
reserved = await concurrency_module.try_acquire_memory(footprint)
reservations.append(reserved)

# 2 GET requests (8MB each = 16MB)
for _ in range(2):
footprint = concurrency_module.estimate_memory_footprint("GET", 0)
assert footprint == 8 * 1024 * 1024
reserved = await concurrency_module.try_acquire_memory(footprint)
reservations.append(reserved)

# Total: 48MB used, 16MB remaining
assert concurrency_module.get_active_memory() == 48 * 1024 * 1024

# Calculate how many small files fit in remaining 16MB budget
# Each small file reserves MIN_RESERVATION (64KB = 65536 bytes)
remaining_budget = 64 * 1024 * 1024 - 48 * 1024 * 1024 # 16MB
files_that_fit = remaining_budget // concurrency_module.MIN_RESERVATION # 256 files

small_reservations = []
for _ in range(files_that_fit):
# 16MB ES part: the framed path's real peak (streaming_upload_peak) is well
# above the bare 8MB internal-part size -- the under-count that caused the
# OOM. One such reservation is half the budget, so two run concurrently.
es = concurrency_module.estimate_memory_footprint("PUT", 16 * 1024 * 1024)
assert es == crypto.streaming_upload_peak(16 * 1024 * 1024)
assert es == limit // 2
reservations.append(await concurrency_module.try_acquire_memory(es))

used = es
assert concurrency_module.get_active_memory() == used

# Small metadata files share the remaining budget. The remaining 32MB fits
# hundreds of MIN_RESERVATION files; acquire a handful to confirm they
# coexist with the ES part (full saturation is covered elsewhere).
remaining_budget = limit - used
assert remaining_budget // concurrency_module.MIN_RESERVATION >= 8
for _ in range(8):
footprint = concurrency_module.estimate_memory_footprint("PUT", 1024)
reserved = await concurrency_module.try_acquire_memory(footprint)
small_reservations.append(reserved)
assert footprint == concurrency_module.MIN_RESERVATION
reservations.append(await concurrency_module.try_acquire_memory(footprint))

# Now at 64MB (48MB large + 256 * 64KB = 16MB small)
expected_total = 48 * 1024 * 1024 + files_that_fit * concurrency_module.MIN_RESERVATION
assert concurrency_module.get_active_memory() == expected_total

# Next request should fail
with pytest.raises(S3Error):
await concurrency_module.try_acquire_memory(concurrency_module.MIN_RESERVATION)
assert (
concurrency_module.get_active_memory() == used + 8 * concurrency_module.MIN_RESERVATION
)

# Clean up
for r in reservations + small_reservations:
for r in reservations:
await concurrency_module.release_memory(r)

assert concurrency_module.get_active_memory() == 0
Expand Down
Loading