diff --git a/s3proxy/concurrency.py b/s3proxy/concurrency.py index e541fcc..2fd2486 100644 --- a/s3proxy/concurrency.py +++ b/s3proxy/concurrency.py @@ -12,7 +12,7 @@ import structlog -from s3proxy.crypto import memory_bounded_part_size +from s3proxy.crypto import streaming_upload_peak from s3proxy.errors import S3Error from s3proxy.metrics import MEMORY_LIMIT_BYTES, MEMORY_REJECTIONS, MEMORY_RESERVED_BYTES @@ -87,19 +87,21 @@ async def try_acquire(self, bytes_needed: int) -> int: to_reserve = max(MIN_RESERVATION, bytes_needed) - # Single request exceeds entire budget — can never fit, reject immediately + # A single request's honest peak can exceed the whole budget: the framed + # upload path needs ~2-3x the internal part, more than a deliberately tight + # governor budget (which sits well below pod RAM to bound *concurrency*). + # Clamp to the budget so such a request runs exclusively (concurrency 1) + # rather than being refused outright -- the proxy streams it fine, it just + # can't share the budget. ponytail: a request whose true peak exceeds the + # pod's RAM (not just this budget) could still OOM when run alone; all + # realistic parts (<=~56MB) stay far under that. if to_reserve > self._limit_bytes: - request_mb = to_reserve / 1024 / 1024 - limit_mb = self._limit_bytes / 1024 / 1024 - logger.warning( - "MEMORY_TOO_LARGE", - requested_mb=round(request_mb, 2), - limit_mb=round(limit_mb, 2), - ) - MEMORY_REJECTIONS.inc() - raise S3Error.slow_down( - f"Request needs {request_mb:.0f}MB but budget is {limit_mb:.0f}MB" + logger.info( + "MEMORY_CLAMPED_TO_BUDGET", + requested_mb=round(to_reserve / 1024 / 1024, 2), + limit_mb=round(self._limit_bytes / 1024 / 1024, 2), ) + to_reserve = self._limit_bytes async with self._condition: deadline = asyncio.get_event_loop().time() + BACKPRESSURE_TIMEOUT @@ -164,12 +166,12 @@ async def release(self, bytes_reserved: int) -> None: def estimate_memory_footprint(method: str, content_length: int) -> int: """Estimate memory needed for a request. - Small PUTs buffer the whole body + ciphertext. Larger PUTs stream and buffer - one internal part at a time, so reserve exactly that internal part size -- - the same value the upload path uses (memory_bounded_part_size). This keeps - the reservation honest: the limiter then admits only as many concurrent - uploads as actually fit the budget, instead of under-counting and OOMing. - GETs reserve a baseline here; encrypted GETs acquire additional memory in the handler. + PUTs stream and encrypt one internal part at a time; reserve the framed + path's true peak (streaming_upload_peak), which stacks the accumulated + ciphertext, the encrypt transient, the held frame and the HTTP body copy -- + not just the part size. Reserving the bare part size under-counted ~3x and + let the limiter admit too many concurrent uploads -> OOM. GETs reserve a + baseline; encrypted GETs acquire more in the handler. """ if method in ("HEAD", "DELETE"): return 0 @@ -177,9 +179,7 @@ def estimate_memory_footprint(method: str, content_length: int) -> int: return MAX_BUFFER_SIZE if method == "POST": return MIN_RESERVATION - if content_length <= MAX_BUFFER_SIZE: - return max(MIN_RESERVATION, content_length * 2) - return memory_bounded_part_size(content_length) + return max(MIN_RESERVATION, streaming_upload_peak(content_length)) # Module-level convenience functions delegating to the default instance diff --git a/s3proxy/crypto.py b/s3proxy/crypto.py index 72e9cbe..5aaa05d 100644 --- a/s3proxy/crypto.py +++ b/s3proxy/crypto.py @@ -166,6 +166,28 @@ def memory_bounded_part_size( return -(-content_length // parts) # ceil: even split, never exceeds `parts` +def streaming_upload_peak(content_length: int) -> int: + """Peak memory the framed UploadPart path holds for one in-flight request. + + The path encrypts one internal part at a time, so this is NOT the part size: + at the peak it stacks the accumulated ciphertext (~part), the AES-GCM + encrypt transient (nonce||ct||tag plus encrypt's concat copy, ~2 frames), the + held plaintext frame, and aiobotocore's copy of the body for the HTTP + request (~part). ``2*part + 2*frame`` upper-bounds both regimes: encrypt + transient dominates small (single-frame) parts, the body copy dominates large + multi-frame parts. Measured peaks (tracemalloc): 16MB part -> 24.5MB (bound + 32MB), 512MB client part / 25.6MB internal -> 56.1MB (bound 67MB). Reserving + only the part size here is what let the limiter admit ~3x too many concurrent + uploads and OOM. + """ + # ponytail: bound, not exact. Ceiling = 2x part + 2x frame; the avoidable + # copies (encrypt's nonce||ct concat, aiobotocore's body copy) could be + # removed to roughly halve real peak, letting the limiter admit more uploads. + part = memory_bounded_part_size(content_length) + frame = min(part, FRAME_PLAINTEXT_SIZE) + return 2 * part + 2 * frame + + @dataclass(slots=True) class EncryptedData: """Container for encrypted data and metadata.""" diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py new file mode 100644 index 0000000..e4ce68b --- /dev/null +++ b/tests/unit/conftest.py @@ -0,0 +1,22 @@ +"""Unit-test fixtures.""" + +import pytest + +import s3proxy.concurrency as concurrency + + +@pytest.fixture(autouse=True) +def no_backpressure_wait(): + """Reject immediately instead of waiting out the backpressure timeout. + + The rejection tests fill the budget to capacity and then try to acquire more, + which can never succeed -- with the production timeout each such test sleeps + the full BACKPRESSURE_TIMEOUT before asserting SlowDown. None of the in-process + unit tests exercise wait-then-succeed backpressure, so a 0 timeout gives the + same SlowDown with no wall-clock wait. (Patched at runtime, not via env, so it + doesn't depend on import order and never leaks past a test.) + """ + original = concurrency.BACKPRESSURE_TIMEOUT + concurrency.BACKPRESSURE_TIMEOUT = 0 + yield + concurrency.BACKPRESSURE_TIMEOUT = original diff --git a/tests/unit/test_concurrency_limit.py b/tests/unit/test_concurrency_limit.py index fce20a2..65abaa9 100644 --- a/tests/unit/test_concurrency_limit.py +++ b/tests/unit/test_concurrency_limit.py @@ -323,48 +323,51 @@ async def test_release_memory_never_negative(self): assert concurrency_module.get_active_memory() == 0 def test_estimate_memory_footprint_put_small(self): - """PUT with small file should use content_length * 2.""" + """Small PUT reserves the framed path's peak (streaming_upload_peak).""" import s3proxy.concurrency as concurrency_module + from s3proxy import crypto - # 1MB file → 2MB footprint footprint = concurrency_module.estimate_memory_footprint("PUT", 1 * 1024 * 1024) - assert footprint == 2 * 1024 * 1024 + assert footprint == crypto.streaming_upload_peak(1 * 1024 * 1024) def test_estimate_memory_footprint_put_large(self): - """Large PUT reserves the real internal-part buffer the upload holds.""" + """Large PUT reserves the framed path's real peak, above the part size.""" import s3proxy.concurrency as concurrency_module from s3proxy import crypto for mb in (50, 100, 512, 1024): cl = mb * 1024 * 1024 footprint = concurrency_module.estimate_memory_footprint("PUT", cl) - assert footprint == crypto.memory_bounded_part_size(cl) + assert footprint == crypto.streaming_upload_peak(cl) + assert footprint > crypto.memory_bounded_part_size(cl) def test_large_uploads_bounded_below_pod_memory(self): - """Regression for the barman OOM. Two linked invariants: + """Regression for the barman/ES OOM. Linked invariants: 1. an internal part never expands beyond the per-client allocation range - (or part numbers collide) -- for ANY client part size, and - 2. the reservation tracks the real buffer, so admitted x footprint never - exceeds the budget (limiter guarantee), and barman-scale parts admit - only ~2 concurrent (the old flat-16MB estimate admitted ~4 -> OOM). + (or part numbers collide) -- for ANY client part size, + 2. the reservation is the framed path's real peak (> part size), so the + limiter can't under-count and admit too many uploads, and + 3. total admitted memory never exceeds the budget (limiter guarantee). """ import s3proxy.concurrency as concurrency_module from s3proxy import crypto from s3proxy.state import MAX_INTERNAL_PARTS_PER_CLIENT - budget = concurrency_module.get_memory_limit() + budget = concurrency_module.get_memory_limit() # deployed 64MB for mb in (50, 128, 320, 512, 1024, 4096): cl = mb * 1024 * 1024 part = crypto.memory_bounded_part_size(cl) internal_parts = -(-cl // part) assert internal_parts <= MAX_INTERNAL_PARTS_PER_CLIENT, "would collide part numbers" footprint = concurrency_module.estimate_memory_footprint("PUT", cl) + assert footprint > part, "reservation must exceed the bare part size" # limiter guarantee: total admitted memory never exceeds the budget assert (budget // footprint) * footprint <= budget - # barman-scale parts: bounded to ~2 concurrent on the default 64MB budget - footprint_512 = concurrency_module.estimate_memory_footprint("PUT", 512 * 1024 * 1024) - assert budget // footprint_512 <= 2 + # The real workload is 16MB ES parts: honest reservation fits the 64MB + # budget with room for concurrency (the under-count admitted ~8 -> OOM). + es = concurrency_module.estimate_memory_footprint("PUT", 16 * 1024 * 1024) + assert es <= budget // 2 def test_estimate_memory_footprint_get(self): """GET should always use fixed buffer size.""" diff --git a/tests/unit/test_memory_concurrency.py b/tests/unit/test_memory_concurrency.py index a8f2c76..967e488 100644 --- a/tests/unit/test_memory_concurrency.py +++ b/tests/unit/test_memory_concurrency.py @@ -5,9 +5,10 @@ at 733 bytes) should not be treated the same as large uploads (100MB+). Memory estimation logic: -- PUT ≤8MB: content_length * 2 (body + ciphertext buffer) -- PUT >8MB: memory_bounded_part_size(content_length) (the one internal part the - streaming/framed upload path actually buffers at a time) +- PUT: streaming_upload_peak(content_length) -- the framed upload path's true + peak (accumulated ciphertext + encrypt transient + held frame + HTTP body + copy), NOT the bare internal-part size. Reserving the part size under-counted + ~3x and let the limiter admit too many concurrent uploads -> OOM. - GET: MAX_BUFFER_SIZE (8MB baseline, handler acquires more for encrypted decrypts) - POST: MIN_RESERVATION (64KB, metadata only) - HEAD/DELETE: 0 (no buffering, bypass limit) @@ -35,29 +36,32 @@ def reset_state(self): yield concurrency_module.reset_state() - def test_small_file_uses_content_length_x2(self): - """PUT with 1KB file should reserve 2KB (content_length * 2).""" + def test_small_file_reserves_streaming_peak(self): + """Tiny PUTs floor at MIN_RESERVATION; small ones reserve the framed peak.""" import s3proxy.concurrency as concurrency_module + from s3proxy import crypto - footprint = concurrency_module.estimate_memory_footprint("PUT", 1024) - # 1KB * 2 = 2KB, but minimum is 64KB - assert footprint == concurrency_module.MIN_RESERVATION - - # With 100KB file: 100KB * 2 = 200KB + # 1KB peak is tiny -> floored at MIN_RESERVATION (64KB) + assert concurrency_module.estimate_memory_footprint("PUT", 1024) == ( + concurrency_module.MIN_RESERVATION + ) + # 100KB part stays single-frame: peak = 2*part + 2*frame = 4*100KB footprint = concurrency_module.estimate_memory_footprint("PUT", 100 * 1024) - assert footprint == 200 * 1024 + assert footprint == crypto.streaming_upload_peak(100 * 1024) == 400 * 1024 - def test_large_file_reserves_real_internal_part(self): - """Large PUTs must reserve the actual internal-part buffer the upload - path holds (memory_bounded_part_size), not a flat guess -- otherwise the - limiter under-counts and admits too many concurrent uploads (the OOM).""" + def test_large_file_reserves_framed_peak_not_part_size(self): + """Large PUTs must reserve the framed path's true peak, NOT the bare + internal-part size -- reserving the part size under-counted ~3x and let + the limiter admit too many concurrent uploads (the OOM). The peak must + strictly exceed the part size.""" import s3proxy.concurrency as concurrency_module from s3proxy import crypto for mb in (50, 100, 512, 1024): cl = mb * 1024 * 1024 footprint = concurrency_module.estimate_memory_footprint("PUT", cl) - assert footprint == crypto.memory_bounded_part_size(cl) + assert footprint == crypto.streaming_upload_peak(cl) + assert footprint > crypto.memory_bounded_part_size(cl) def test_minimum_reservation_enforced(self): """0-byte file should still reserve MIN_RESERVATION (64KB).""" @@ -151,16 +155,20 @@ async def test_memory_released_on_completion(self): assert concurrency_module.get_active_memory() == 0 @pytest.mark.asyncio - async def test_single_request_cannot_exceed_budget(self): - """A single 100MB request should be rejected when it exceeds the 64MB budget.""" + async def test_single_request_larger_than_budget_runs_exclusively(self): + """A request whose footprint exceeds the whole budget is NOT rejected -- it + is clamped to the budget and runs exclusively (concurrency 1). The proxy + streams it fine; the budget bounds concurrency, not one request's peak. + (Rejecting it would refuse large uploads under a tight budget -- the + integration OOM test uploads 30MB at a 16MB budget and must succeed.) + """ import s3proxy.concurrency as concurrency_module - from s3proxy.errors import S3Error - - # Request 100MB — exceeds 64MB limit, should be rejected immediately - with pytest.raises(S3Error, match="503"): - await concurrency_module.try_acquire_memory(100 * 1024 * 1024) - # No memory should be reserved after rejection + limit = concurrency_module.get_memory_limit() + reserved = await concurrency_module.try_acquire_memory(100 * 1024 * 1024) + assert reserved == limit # clamped to the whole budget + assert concurrency_module.get_active_memory() == limit + await concurrency_module.release_memory(reserved) assert concurrency_module.get_active_memory() == 0 @pytest.mark.asyncio @@ -263,50 +271,42 @@ async def test_elasticsearch_shard_backup_scenario(self): @pytest.mark.asyncio async def test_mixed_workload_scenario(self): - """Simulate mixed workload: small files + streaming uploads.""" + """Mixed workload at the deployed 64MB budget: the real 16MB ES snapshot + uploads (framed) plus small metadata files, then full.""" import s3proxy.concurrency as concurrency_module - from s3proxy.errors import S3Error + from s3proxy import crypto + concurrency_module.set_memory_limit(64) # deployed governor budget + limit = 64 * 1024 * 1024 reservations = [] - # 2 large streaming uploads (320MB -> 16MB internal part each = 32MB) - for _ in range(2): - footprint = concurrency_module.estimate_memory_footprint("PUT", 320 * 1024 * 1024) - assert footprint == 16 * 1024 * 1024 # one 16MB internal part - reserved = await concurrency_module.try_acquire_memory(footprint) - reservations.append(reserved) - - # 2 GET requests (8MB each = 16MB) - for _ in range(2): - footprint = concurrency_module.estimate_memory_footprint("GET", 0) - assert footprint == 8 * 1024 * 1024 - reserved = await concurrency_module.try_acquire_memory(footprint) - reservations.append(reserved) - - # Total: 48MB used, 16MB remaining - assert concurrency_module.get_active_memory() == 48 * 1024 * 1024 - - # Calculate how many small files fit in remaining 16MB budget - # Each small file reserves MIN_RESERVATION (64KB = 65536 bytes) - remaining_budget = 64 * 1024 * 1024 - 48 * 1024 * 1024 # 16MB - files_that_fit = remaining_budget // concurrency_module.MIN_RESERVATION # 256 files - - small_reservations = [] - for _ in range(files_that_fit): + # 16MB ES part: the framed path's real peak (streaming_upload_peak) is well + # above the bare 8MB internal-part size -- the under-count that caused the + # OOM. One such reservation is half the budget, so two run concurrently. + es = concurrency_module.estimate_memory_footprint("PUT", 16 * 1024 * 1024) + assert es == crypto.streaming_upload_peak(16 * 1024 * 1024) + assert es == limit // 2 + reservations.append(await concurrency_module.try_acquire_memory(es)) + + used = es + assert concurrency_module.get_active_memory() == used + + # Small metadata files share the remaining budget. The remaining 32MB fits + # hundreds of MIN_RESERVATION files; acquire a handful to confirm they + # coexist with the ES part (full saturation is covered elsewhere). + remaining_budget = limit - used + assert remaining_budget // concurrency_module.MIN_RESERVATION >= 8 + for _ in range(8): footprint = concurrency_module.estimate_memory_footprint("PUT", 1024) - reserved = await concurrency_module.try_acquire_memory(footprint) - small_reservations.append(reserved) + assert footprint == concurrency_module.MIN_RESERVATION + reservations.append(await concurrency_module.try_acquire_memory(footprint)) - # Now at 64MB (48MB large + 256 * 64KB = 16MB small) - expected_total = 48 * 1024 * 1024 + files_that_fit * concurrency_module.MIN_RESERVATION - assert concurrency_module.get_active_memory() == expected_total - - # Next request should fail - with pytest.raises(S3Error): - await concurrency_module.try_acquire_memory(concurrency_module.MIN_RESERVATION) + assert ( + concurrency_module.get_active_memory() == used + 8 * concurrency_module.MIN_RESERVATION + ) # Clean up - for r in reservations + small_reservations: + for r in reservations: await concurrency_module.release_memory(r) assert concurrency_module.get_active_memory() == 0 diff --git a/tests/unit/test_upload_reservation_vs_real.py b/tests/unit/test_upload_reservation_vs_real.py new file mode 100644 index 0000000..e2fd96a --- /dev/null +++ b/tests/unit/test_upload_reservation_vs_real.py @@ -0,0 +1,116 @@ +"""The reservation must bound the framed UploadPart path's REAL peak memory. + +This is the test that would have caught the Elasticsearch OOM: the memory +governor reserved only the internal-part size (8MB for a 16MB part) while the +framed path's real peak is ~3x that (accumulated ciphertext + AES-GCM encrypt +transient + held plaintext frame + the transport's copy of the body). The +governor therefore admitted ~3x too many concurrent uploads and the pod was +OOMKilled. + +It drives the ACTUAL handler method (not a re-implementation) under tracemalloc, +so it fails if either the reservation drifts below reality OR the upload path +starts allocating more. The mock S3 client copies the body the way aiobotocore +does for the signed HTTP request, which is part of the real peak. +""" + +import hashlib +import tracemalloc + +import pytest + +from s3proxy import crypto +from s3proxy.concurrency import estimate_memory_footprint, set_memory_limit +from s3proxy.handlers.multipart.upload_part import UploadPartMixin +from s3proxy.state import MultipartUploadState + +MB = 1024 * 1024 + + +class _Mgr: + async def add_part(self, *a, **k): + return None + + +class _Client: + async def upload_part(self, bucket, key, upload_id, part_number, body): + # aiobotocore copies the body to sign and send it; mirror that so the + # measured peak reflects what the real transport holds. + sent = bytes(body) + return {"ETag": hashlib.md5(sent).hexdigest()} + + +class _Request: + def __init__(self, total, chunk=64 * 1024): + self._total = total + self._chunk = chunk + + async def stream(self): + for i in range(0, self._total, self._chunk): + yield b"x" * min(self._chunk, self._total - i) + + +def _handler(): + h = UploadPartMixin.__new__(UploadPartMixin) # bypass BaseHandler.__init__ + h.multipart_manager = _Mgr() + return h + + +async def _measure_peak(content_length): + h = _handler() + state = MultipartUploadState(dek=crypto.generate_dek(), bucket="b", key="k", upload_id="u") + part_size = crypto.memory_bounded_part_size(content_length) + tracemalloc.start() + base = tracemalloc.get_traced_memory()[0] + await h._stream_and_upload_framed( + _Request(content_length), + _Client(), + "b", + "k", + "u", + 1, + state, + content_length, + part_size, + 1, + ) + peak = tracemalloc.get_traced_memory()[1] + tracemalloc.stop() + return peak - base + + +@pytest.mark.asyncio +@pytest.mark.parametrize("mb", [16, 64, 512]) +async def test_reservation_bounds_real_framed_peak(mb): + content_length = mb * MB + real_peak = await _measure_peak(content_length) + reserved = estimate_memory_footprint("PUT", content_length) + + # The whole point: the reservation must cover the REAL peak, not the bare + # part size. A bare-part-size reservation (the old behaviour) fails here. + assert reserved >= real_peak, ( + f"{mb}MB part: reserved {reserved / MB:.1f}MB < real peak " + f"{real_peak / MB:.1f}MB -- governor under-counts, will OOM" + ) + # And the old under-count must be demonstrably insufficient. + assert crypto.memory_bounded_part_size(content_length) < real_peak + + +# Deployed governor budget (chart/values.yaml performance.memoryLimitMb). +DEPLOY_LIMIT_MB = 64 + + +def test_es_part_fits_deploy_budget_with_concurrency(): + """The real workload is 16MB ES snapshot parts. Its honest reservation must + fit the 64MB budget AND leave room for concurrency -- if a single 16MB part + reserved the whole budget the proxy would serialise to one upload at a time. + """ + set_memory_limit(DEPLOY_LIMIT_MB) + try: + reserved = estimate_memory_footprint("PUT", 16 * MB) + budget = DEPLOY_LIMIT_MB * MB + assert reserved <= budget // 2, ( + f"16MB part reserves {reserved / MB:.1f}MB > half the {DEPLOY_LIMIT_MB}MB " + f"budget -- no concurrency" + ) + finally: + set_memory_limit(DEPLOY_LIMIT_MB)