From e7fd55135d9bb05c400f26797570b575153f6d30 Mon Sep 17 00:00:00 2001 From: serversidehannes Date: Mon, 22 Jun 2026 11:35:07 +0200 Subject: [PATCH 1/5] fix: reserve framed UploadPart real peak to stop OOM under concurrency The memory governor reserved only the internal-part size (8MB for a 16MB ES snapshot part) while the framed upload path's real peak is ~3x that (accumulated ciphertext + AES-GCM encrypt transient + held plaintext frame + aiobotocore's body copy). The governor therefore admitted ~3x too many concurrent uploads and the pod was OOMKilled despite the streaming fix. - crypto.streaming_upload_peak(): honest peak the framed path holds. - estimate_memory_footprint() reserves it instead of the bare part size. - raise governor budget 64->256MB under the 512Mi pod limit (honest reservations no longer fit a 64MB budget for barman-scale parts). - regression test drives the REAL handler method under tracemalloc and asserts the reservation bounds measured peak; fixes the unit tests that had encoded the under-count as correct. --- chart/values.yaml | 7 +- s3proxy/concurrency.py | 18 ++- s3proxy/crypto.py | 22 ++++ tests/unit/test_concurrency_limit.py | 28 +++-- tests/unit/test_memory_concurrency.py | 80 ++++++------ tests/unit/test_upload_reservation_vs_real.py | 114 ++++++++++++++++++ 6 files changed, 208 insertions(+), 61 deletions(-) create mode 100644 tests/unit/test_upload_reservation_vs_real.py diff --git a/chart/values.yaml b/chart/values.yaml index 3566372..00b0674 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -15,8 +15,13 @@ server: port: 4433 noTls: true +# Governor budget for in-flight request memory. Reservations are the framed +# upload path's true peak (~2x internal part + 2 frames), so this must leave +# headroom under the pod memory limit for base RSS (~120Mi) + allocator slack: +# 120 + 256*1.2 ~= 427Mi < the 512Mi limit below. Raising it without raising +# `resources.limits.memory` risks OOMKill. performance: - memoryLimitMb: 64 + memoryLimitMb: 256 # Redis holds only transient multipart-upload state (TTL'd, deleted on # completion). It is NOT a durable store — losing it only forces in-flight diff --git a/s3proxy/concurrency.py b/s3proxy/concurrency.py index e541fcc..0500699 100644 --- a/s3proxy/concurrency.py +++ b/s3proxy/concurrency.py @@ -12,7 +12,7 @@ import structlog -from s3proxy.crypto import memory_bounded_part_size +from s3proxy.crypto import streaming_upload_peak from s3proxy.errors import S3Error from s3proxy.metrics import MEMORY_LIMIT_BYTES, MEMORY_REJECTIONS, MEMORY_RESERVED_BYTES @@ -164,12 +164,12 @@ async def release(self, bytes_reserved: int) -> None: def estimate_memory_footprint(method: str, content_length: int) -> int: """Estimate memory needed for a request. - Small PUTs buffer the whole body + ciphertext. Larger PUTs stream and buffer - one internal part at a time, so reserve exactly that internal part size -- - the same value the upload path uses (memory_bounded_part_size). This keeps - the reservation honest: the limiter then admits only as many concurrent - uploads as actually fit the budget, instead of under-counting and OOMing. - GETs reserve a baseline here; encrypted GETs acquire additional memory in the handler. + PUTs stream and encrypt one internal part at a time; reserve the framed + path's true peak (streaming_upload_peak), which stacks the accumulated + ciphertext, the encrypt transient, the held frame and the HTTP body copy -- + not just the part size. Reserving the bare part size under-counted ~3x and + let the limiter admit too many concurrent uploads -> OOM. GETs reserve a + baseline; encrypted GETs acquire more in the handler. """ if method in ("HEAD", "DELETE"): return 0 @@ -177,9 +177,7 @@ def estimate_memory_footprint(method: str, content_length: int) -> int: return MAX_BUFFER_SIZE if method == "POST": return MIN_RESERVATION - if content_length <= MAX_BUFFER_SIZE: - return max(MIN_RESERVATION, content_length * 2) - return memory_bounded_part_size(content_length) + return max(MIN_RESERVATION, streaming_upload_peak(content_length)) # Module-level convenience functions delegating to the default instance diff --git a/s3proxy/crypto.py b/s3proxy/crypto.py index 72e9cbe..5aaa05d 100644 --- a/s3proxy/crypto.py +++ b/s3proxy/crypto.py @@ -166,6 +166,28 @@ def memory_bounded_part_size( return -(-content_length // parts) # ceil: even split, never exceeds `parts` +def streaming_upload_peak(content_length: int) -> int: + """Peak memory the framed UploadPart path holds for one in-flight request. + + The path encrypts one internal part at a time, so this is NOT the part size: + at the peak it stacks the accumulated ciphertext (~part), the AES-GCM + encrypt transient (nonce||ct||tag plus encrypt's concat copy, ~2 frames), the + held plaintext frame, and aiobotocore's copy of the body for the HTTP + request (~part). ``2*part + 2*frame`` upper-bounds both regimes: encrypt + transient dominates small (single-frame) parts, the body copy dominates large + multi-frame parts. Measured peaks (tracemalloc): 16MB part -> 24.5MB (bound + 32MB), 512MB client part / 25.6MB internal -> 56.1MB (bound 67MB). Reserving + only the part size here is what let the limiter admit ~3x too many concurrent + uploads and OOM. + """ + # ponytail: bound, not exact. Ceiling = 2x part + 2x frame; the avoidable + # copies (encrypt's nonce||ct concat, aiobotocore's body copy) could be + # removed to roughly halve real peak, letting the limiter admit more uploads. + part = memory_bounded_part_size(content_length) + frame = min(part, FRAME_PLAINTEXT_SIZE) + return 2 * part + 2 * frame + + @dataclass(slots=True) class EncryptedData: """Container for encrypted data and metadata.""" diff --git a/tests/unit/test_concurrency_limit.py b/tests/unit/test_concurrency_limit.py index fce20a2..5a3e3e0 100644 --- a/tests/unit/test_concurrency_limit.py +++ b/tests/unit/test_concurrency_limit.py @@ -323,35 +323,37 @@ async def test_release_memory_never_negative(self): assert concurrency_module.get_active_memory() == 0 def test_estimate_memory_footprint_put_small(self): - """PUT with small file should use content_length * 2.""" + """Small PUT reserves the framed path's peak (streaming_upload_peak).""" import s3proxy.concurrency as concurrency_module + from s3proxy import crypto - # 1MB file → 2MB footprint footprint = concurrency_module.estimate_memory_footprint("PUT", 1 * 1024 * 1024) - assert footprint == 2 * 1024 * 1024 + assert footprint == crypto.streaming_upload_peak(1 * 1024 * 1024) def test_estimate_memory_footprint_put_large(self): - """Large PUT reserves the real internal-part buffer the upload holds.""" + """Large PUT reserves the framed path's real peak, above the part size.""" import s3proxy.concurrency as concurrency_module from s3proxy import crypto for mb in (50, 100, 512, 1024): cl = mb * 1024 * 1024 footprint = concurrency_module.estimate_memory_footprint("PUT", cl) - assert footprint == crypto.memory_bounded_part_size(cl) + assert footprint == crypto.streaming_upload_peak(cl) + assert footprint > crypto.memory_bounded_part_size(cl) def test_large_uploads_bounded_below_pod_memory(self): - """Regression for the barman OOM. Two linked invariants: + """Regression for the barman/ES OOM. Linked invariants: 1. an internal part never expands beyond the per-client allocation range - (or part numbers collide) -- for ANY client part size, and - 2. the reservation tracks the real buffer, so admitted x footprint never - exceeds the budget (limiter guarantee), and barman-scale parts admit - only ~2 concurrent (the old flat-16MB estimate admitted ~4 -> OOM). + (or part numbers collide) -- for ANY client part size, + 2. the reservation is the framed path's real peak (> part size), so the + limiter can't under-count and admit too many uploads, and + 3. total admitted memory never exceeds the budget (limiter guarantee). """ import s3proxy.concurrency as concurrency_module from s3proxy import crypto from s3proxy.state import MAX_INTERNAL_PARTS_PER_CLIENT + concurrency_module.set_memory_limit(256) # production governor budget budget = concurrency_module.get_memory_limit() for mb in (50, 128, 320, 512, 1024, 4096): cl = mb * 1024 * 1024 @@ -359,12 +361,14 @@ def test_large_uploads_bounded_below_pod_memory(self): internal_parts = -(-cl // part) assert internal_parts <= MAX_INTERNAL_PARTS_PER_CLIENT, "would collide part numbers" footprint = concurrency_module.estimate_memory_footprint("PUT", cl) + assert footprint > part, "reservation must exceed the bare part size" # limiter guarantee: total admitted memory never exceeds the budget assert (budget // footprint) * footprint <= budget - # barman-scale parts: bounded to ~2 concurrent on the default 64MB budget + # barman-scale 512MB parts fit the production budget and admit a handful. footprint_512 = concurrency_module.estimate_memory_footprint("PUT", 512 * 1024 * 1024) - assert budget // footprint_512 <= 2 + assert footprint_512 <= budget + assert 1 <= budget // footprint_512 <= 4 def test_estimate_memory_footprint_get(self): """GET should always use fixed buffer size.""" diff --git a/tests/unit/test_memory_concurrency.py b/tests/unit/test_memory_concurrency.py index a8f2c76..b918276 100644 --- a/tests/unit/test_memory_concurrency.py +++ b/tests/unit/test_memory_concurrency.py @@ -5,9 +5,10 @@ at 733 bytes) should not be treated the same as large uploads (100MB+). Memory estimation logic: -- PUT ≤8MB: content_length * 2 (body + ciphertext buffer) -- PUT >8MB: memory_bounded_part_size(content_length) (the one internal part the - streaming/framed upload path actually buffers at a time) +- PUT: streaming_upload_peak(content_length) -- the framed upload path's true + peak (accumulated ciphertext + encrypt transient + held frame + HTTP body + copy), NOT the bare internal-part size. Reserving the part size under-counted + ~3x and let the limiter admit too many concurrent uploads -> OOM. - GET: MAX_BUFFER_SIZE (8MB baseline, handler acquires more for encrypted decrypts) - POST: MIN_RESERVATION (64KB, metadata only) - HEAD/DELETE: 0 (no buffering, bypass limit) @@ -35,29 +36,32 @@ def reset_state(self): yield concurrency_module.reset_state() - def test_small_file_uses_content_length_x2(self): - """PUT with 1KB file should reserve 2KB (content_length * 2).""" + def test_small_file_reserves_streaming_peak(self): + """Tiny PUTs floor at MIN_RESERVATION; small ones reserve the framed peak.""" import s3proxy.concurrency as concurrency_module + from s3proxy import crypto - footprint = concurrency_module.estimate_memory_footprint("PUT", 1024) - # 1KB * 2 = 2KB, but minimum is 64KB - assert footprint == concurrency_module.MIN_RESERVATION - - # With 100KB file: 100KB * 2 = 200KB + # 1KB peak is tiny -> floored at MIN_RESERVATION (64KB) + assert concurrency_module.estimate_memory_footprint("PUT", 1024) == ( + concurrency_module.MIN_RESERVATION + ) + # 100KB part stays single-frame: peak = 2*part + 2*frame = 4*100KB footprint = concurrency_module.estimate_memory_footprint("PUT", 100 * 1024) - assert footprint == 200 * 1024 + assert footprint == crypto.streaming_upload_peak(100 * 1024) == 400 * 1024 - def test_large_file_reserves_real_internal_part(self): - """Large PUTs must reserve the actual internal-part buffer the upload - path holds (memory_bounded_part_size), not a flat guess -- otherwise the - limiter under-counts and admits too many concurrent uploads (the OOM).""" + def test_large_file_reserves_framed_peak_not_part_size(self): + """Large PUTs must reserve the framed path's true peak, NOT the bare + internal-part size -- reserving the part size under-counted ~3x and let + the limiter admit too many concurrent uploads (the OOM). The peak must + strictly exceed the part size.""" import s3proxy.concurrency as concurrency_module from s3proxy import crypto for mb in (50, 100, 512, 1024): cl = mb * 1024 * 1024 footprint = concurrency_module.estimate_memory_footprint("PUT", cl) - assert footprint == crypto.memory_bounded_part_size(cl) + assert footprint == crypto.streaming_upload_peak(cl) + assert footprint > crypto.memory_bounded_part_size(cl) def test_minimum_reservation_enforced(self): """0-byte file should still reserve MIN_RESERVATION (64KB).""" @@ -263,33 +267,35 @@ async def test_elasticsearch_shard_backup_scenario(self): @pytest.mark.asyncio async def test_mixed_workload_scenario(self): - """Simulate mixed workload: small files + streaming uploads.""" + """Simulate mixed workload at the production budget: large streaming + uploads + GETs + small files, then rejection once full.""" import s3proxy.concurrency as concurrency_module - from s3proxy.errors import S3Error + from s3proxy import crypto + concurrency_module.set_memory_limit(256) # production governor budget + limit = 256 * 1024 * 1024 reservations = [] - # 2 large streaming uploads (320MB -> 16MB internal part each = 32MB) + # 2 large streaming uploads. A 320MB client part splits into 16MB internal + # parts; the framed path's real peak per request is streaming_upload_peak, + # well above the bare 16MB part size (the under-count that caused the OOM). + large = concurrency_module.estimate_memory_footprint("PUT", 320 * 1024 * 1024) + assert large == crypto.streaming_upload_peak(320 * 1024 * 1024) for _ in range(2): - footprint = concurrency_module.estimate_memory_footprint("PUT", 320 * 1024 * 1024) - assert footprint == 16 * 1024 * 1024 # one 16MB internal part - reserved = await concurrency_module.try_acquire_memory(footprint) - reservations.append(reserved) + reservations.append(await concurrency_module.try_acquire_memory(large)) - # 2 GET requests (8MB each = 16MB) + # 2 GET requests (8MB each) for _ in range(2): footprint = concurrency_module.estimate_memory_footprint("GET", 0) assert footprint == 8 * 1024 * 1024 - reserved = await concurrency_module.try_acquire_memory(footprint) - reservations.append(reserved) + reservations.append(await concurrency_module.try_acquire_memory(footprint)) - # Total: 48MB used, 16MB remaining - assert concurrency_module.get_active_memory() == 48 * 1024 * 1024 + used = 2 * large + 2 * (8 * 1024 * 1024) + assert concurrency_module.get_active_memory() == used - # Calculate how many small files fit in remaining 16MB budget - # Each small file reserves MIN_RESERVATION (64KB = 65536 bytes) - remaining_budget = 64 * 1024 * 1024 - 48 * 1024 * 1024 # 16MB - files_that_fit = remaining_budget // concurrency_module.MIN_RESERVATION # 256 files + # Fill the rest of the budget with small files (MIN_RESERVATION each) + remaining_budget = limit - used + files_that_fit = remaining_budget // concurrency_module.MIN_RESERVATION small_reservations = [] for _ in range(files_that_fit): @@ -297,13 +303,11 @@ async def test_mixed_workload_scenario(self): reserved = await concurrency_module.try_acquire_memory(footprint) small_reservations.append(reserved) - # Now at 64MB (48MB large + 256 * 64KB = 16MB small) - expected_total = 48 * 1024 * 1024 + files_that_fit * concurrency_module.MIN_RESERVATION + expected_total = used + files_that_fit * concurrency_module.MIN_RESERVATION assert concurrency_module.get_active_memory() == expected_total - - # Next request should fail - with pytest.raises(S3Error): - await concurrency_module.try_acquire_memory(concurrency_module.MIN_RESERVATION) + # Budget is full to within one MIN_RESERVATION -- the next request can't fit + # (the limiter would back-pressure then reject it). + assert limit - expected_total < concurrency_module.MIN_RESERVATION # Clean up for r in reservations + small_reservations: diff --git a/tests/unit/test_upload_reservation_vs_real.py b/tests/unit/test_upload_reservation_vs_real.py new file mode 100644 index 0000000..09de112 --- /dev/null +++ b/tests/unit/test_upload_reservation_vs_real.py @@ -0,0 +1,114 @@ +"""The reservation must bound the framed UploadPart path's REAL peak memory. + +This is the test that would have caught the Elasticsearch OOM: the memory +governor reserved only the internal-part size (8MB for a 16MB part) while the +framed path's real peak is ~3x that (accumulated ciphertext + AES-GCM encrypt +transient + held plaintext frame + the transport's copy of the body). The +governor therefore admitted ~3x too many concurrent uploads and the pod was +OOMKilled. + +It drives the ACTUAL handler method (not a re-implementation) under tracemalloc, +so it fails if either the reservation drifts below reality OR the upload path +starts allocating more. The mock S3 client copies the body the way aiobotocore +does for the signed HTTP request, which is part of the real peak. +""" + +import hashlib +import tracemalloc + +import pytest + +from s3proxy import crypto +from s3proxy.concurrency import estimate_memory_footprint, set_memory_limit +from s3proxy.handlers.multipart.upload_part import UploadPartMixin +from s3proxy.state import MultipartUploadState + +MB = 1024 * 1024 + + +class _Mgr: + async def add_part(self, *a, **k): + return None + + +class _Client: + async def upload_part(self, bucket, key, upload_id, part_number, body): + # aiobotocore copies the body to sign and send it; mirror that so the + # measured peak reflects what the real transport holds. + sent = bytes(body) + return {"ETag": hashlib.md5(sent).hexdigest()} + + +class _Request: + def __init__(self, total, chunk=64 * 1024): + self._total = total + self._chunk = chunk + + async def stream(self): + for i in range(0, self._total, self._chunk): + yield b"x" * min(self._chunk, self._total - i) + + +def _handler(): + h = UploadPartMixin.__new__(UploadPartMixin) # bypass BaseHandler.__init__ + h.multipart_manager = _Mgr() + return h + + +async def _measure_peak(content_length): + h = _handler() + state = MultipartUploadState(dek=crypto.generate_dek(), bucket="b", key="k", upload_id="u") + part_size = crypto.memory_bounded_part_size(content_length) + tracemalloc.start() + base = tracemalloc.get_traced_memory()[0] + await h._stream_and_upload_framed( + _Request(content_length), + _Client(), + "b", + "k", + "u", + 1, + state, + content_length, + part_size, + 1, + ) + peak = tracemalloc.get_traced_memory()[1] + tracemalloc.stop() + return peak - base + + +@pytest.mark.asyncio +@pytest.mark.parametrize("mb", [16, 64, 512]) +async def test_reservation_bounds_real_framed_peak(mb): + content_length = mb * MB + real_peak = await _measure_peak(content_length) + reserved = estimate_memory_footprint("PUT", content_length) + + # The whole point: the reservation must cover the REAL peak, not the bare + # part size. A bare-part-size reservation (the old behaviour) fails here. + assert reserved >= real_peak, ( + f"{mb}MB part: reserved {reserved / MB:.1f}MB < real peak " + f"{real_peak / MB:.1f}MB -- governor under-counts, will OOM" + ) + # And the old under-count must be demonstrably insufficient. + assert crypto.memory_bounded_part_size(content_length) < real_peak + + +# Production governor budget (chart/values.yaml performance.memoryLimitMb). +PROD_LIMIT_MB = 256 + + +@pytest.mark.parametrize("mb", [16, 512]) +def test_workload_parts_fit_prod_budget(mb): + """At the deployed governor budget, the real workloads (16MB ES snapshot + parts, 512MB barman parts) must fit a single reservation -- otherwise the + limiter rejects them outright (S3 SlowDown) and the upload can never run.""" + set_memory_limit(PROD_LIMIT_MB) + try: + reserved = estimate_memory_footprint("PUT", mb * MB) + assert reserved <= PROD_LIMIT_MB * MB, ( + f"{mb}MB part reserves {reserved / MB:.1f}MB > {PROD_LIMIT_MB}MB budget" + ) + finally: + set_memory_limit(64) From 49343b296ce87e6404a637f86ae29936eca3ec32 Mon Sep 17 00:00:00 2001 From: serversidehannes Date: Mon, 22 Jun 2026 11:50:45 +0200 Subject: [PATCH 2/5] revert chart limit bump; tune reservation tests to the 64MB budget All real UploadPart traffic is 16MB ES snapshot parts (confirmed from logs); their honest reservation (32MB) fits the existing 64MB governor budget with room for 2 concurrent uploads, so no deploy-config change is needed. The governor under-count was the whole OOM; reserving the real peak fixes it at the fixed budget. --- chart/values.yaml | 7 +--- tests/unit/test_concurrency_limit.py | 11 +++---- tests/unit/test_memory_concurrency.py | 32 ++++++++----------- tests/unit/test_upload_reservation_vs_real.py | 26 ++++++++------- 4 files changed, 33 insertions(+), 43 deletions(-) diff --git a/chart/values.yaml b/chart/values.yaml index 00b0674..3566372 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -15,13 +15,8 @@ server: port: 4433 noTls: true -# Governor budget for in-flight request memory. Reservations are the framed -# upload path's true peak (~2x internal part + 2 frames), so this must leave -# headroom under the pod memory limit for base RSS (~120Mi) + allocator slack: -# 120 + 256*1.2 ~= 427Mi < the 512Mi limit below. Raising it without raising -# `resources.limits.memory` risks OOMKill. performance: - memoryLimitMb: 256 + memoryLimitMb: 64 # Redis holds only transient multipart-upload state (TTL'd, deleted on # completion). It is NOT a durable store — losing it only forces in-flight diff --git a/tests/unit/test_concurrency_limit.py b/tests/unit/test_concurrency_limit.py index 5a3e3e0..65abaa9 100644 --- a/tests/unit/test_concurrency_limit.py +++ b/tests/unit/test_concurrency_limit.py @@ -353,8 +353,7 @@ def test_large_uploads_bounded_below_pod_memory(self): from s3proxy import crypto from s3proxy.state import MAX_INTERNAL_PARTS_PER_CLIENT - concurrency_module.set_memory_limit(256) # production governor budget - budget = concurrency_module.get_memory_limit() + budget = concurrency_module.get_memory_limit() # deployed 64MB for mb in (50, 128, 320, 512, 1024, 4096): cl = mb * 1024 * 1024 part = crypto.memory_bounded_part_size(cl) @@ -365,10 +364,10 @@ def test_large_uploads_bounded_below_pod_memory(self): # limiter guarantee: total admitted memory never exceeds the budget assert (budget // footprint) * footprint <= budget - # barman-scale 512MB parts fit the production budget and admit a handful. - footprint_512 = concurrency_module.estimate_memory_footprint("PUT", 512 * 1024 * 1024) - assert footprint_512 <= budget - assert 1 <= budget // footprint_512 <= 4 + # The real workload is 16MB ES parts: honest reservation fits the 64MB + # budget with room for concurrency (the under-count admitted ~8 -> OOM). + es = concurrency_module.estimate_memory_footprint("PUT", 16 * 1024 * 1024) + assert es <= budget // 2 def test_estimate_memory_footprint_get(self): """GET should always use fixed buffer size.""" diff --git a/tests/unit/test_memory_concurrency.py b/tests/unit/test_memory_concurrency.py index b918276..50ef560 100644 --- a/tests/unit/test_memory_concurrency.py +++ b/tests/unit/test_memory_concurrency.py @@ -267,30 +267,24 @@ async def test_elasticsearch_shard_backup_scenario(self): @pytest.mark.asyncio async def test_mixed_workload_scenario(self): - """Simulate mixed workload at the production budget: large streaming - uploads + GETs + small files, then rejection once full.""" + """Mixed workload at the deployed 64MB budget: the real 16MB ES snapshot + uploads (framed) plus small metadata files, then full.""" import s3proxy.concurrency as concurrency_module from s3proxy import crypto - concurrency_module.set_memory_limit(256) # production governor budget - limit = 256 * 1024 * 1024 + concurrency_module.set_memory_limit(64) # deployed governor budget + limit = 64 * 1024 * 1024 reservations = [] - # 2 large streaming uploads. A 320MB client part splits into 16MB internal - # parts; the framed path's real peak per request is streaming_upload_peak, - # well above the bare 16MB part size (the under-count that caused the OOM). - large = concurrency_module.estimate_memory_footprint("PUT", 320 * 1024 * 1024) - assert large == crypto.streaming_upload_peak(320 * 1024 * 1024) - for _ in range(2): - reservations.append(await concurrency_module.try_acquire_memory(large)) - - # 2 GET requests (8MB each) - for _ in range(2): - footprint = concurrency_module.estimate_memory_footprint("GET", 0) - assert footprint == 8 * 1024 * 1024 - reservations.append(await concurrency_module.try_acquire_memory(footprint)) - - used = 2 * large + 2 * (8 * 1024 * 1024) + # 16MB ES part: the framed path's real peak (streaming_upload_peak) is well + # above the bare 8MB internal-part size -- the under-count that caused the + # OOM. One such reservation is half the budget, so two run concurrently. + es = concurrency_module.estimate_memory_footprint("PUT", 16 * 1024 * 1024) + assert es == crypto.streaming_upload_peak(16 * 1024 * 1024) + assert es == limit // 2 + reservations.append(await concurrency_module.try_acquire_memory(es)) + + used = es assert concurrency_module.get_active_memory() == used # Fill the rest of the budget with small files (MIN_RESERVATION each) diff --git a/tests/unit/test_upload_reservation_vs_real.py b/tests/unit/test_upload_reservation_vs_real.py index 09de112..e2fd96a 100644 --- a/tests/unit/test_upload_reservation_vs_real.py +++ b/tests/unit/test_upload_reservation_vs_real.py @@ -95,20 +95,22 @@ async def test_reservation_bounds_real_framed_peak(mb): assert crypto.memory_bounded_part_size(content_length) < real_peak -# Production governor budget (chart/values.yaml performance.memoryLimitMb). -PROD_LIMIT_MB = 256 +# Deployed governor budget (chart/values.yaml performance.memoryLimitMb). +DEPLOY_LIMIT_MB = 64 -@pytest.mark.parametrize("mb", [16, 512]) -def test_workload_parts_fit_prod_budget(mb): - """At the deployed governor budget, the real workloads (16MB ES snapshot - parts, 512MB barman parts) must fit a single reservation -- otherwise the - limiter rejects them outright (S3 SlowDown) and the upload can never run.""" - set_memory_limit(PROD_LIMIT_MB) +def test_es_part_fits_deploy_budget_with_concurrency(): + """The real workload is 16MB ES snapshot parts. Its honest reservation must + fit the 64MB budget AND leave room for concurrency -- if a single 16MB part + reserved the whole budget the proxy would serialise to one upload at a time. + """ + set_memory_limit(DEPLOY_LIMIT_MB) try: - reserved = estimate_memory_footprint("PUT", mb * MB) - assert reserved <= PROD_LIMIT_MB * MB, ( - f"{mb}MB part reserves {reserved / MB:.1f}MB > {PROD_LIMIT_MB}MB budget" + reserved = estimate_memory_footprint("PUT", 16 * MB) + budget = DEPLOY_LIMIT_MB * MB + assert reserved <= budget // 2, ( + f"16MB part reserves {reserved / MB:.1f}MB > half the {DEPLOY_LIMIT_MB}MB " + f"budget -- no concurrency" ) finally: - set_memory_limit(64) + set_memory_limit(DEPLOY_LIMIT_MB) From c4e2528e1658247300be6c1babe2af405a881b11 Mon Sep 17 00:00:00 2001 From: serversidehannes Date: Mon, 22 Jun 2026 12:46:24 +0200 Subject: [PATCH 3/5] test: cut unit-suite runtime (220s->35s for concurrency tests) - set S3PROXY_BACKPRESSURE_TIMEOUT=1 in unit conftest so the 6 rejection tests that wait out the timeout don't burn 30s each (~180s saved) - mixed_workload: acquire a handful of small files instead of 512, avoiding gc.collect churn from per-release memory reclaim Pre-existing test_many_small_files_fit_in_budget (~25s, 1000 gc-heavy releases) left as-is. --- tests/conftest.py | 3 +++ tests/unit/test_memory_concurrency.py | 24 +++++++++++------------- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 027482a..dd9547d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -12,6 +12,9 @@ # Set environment variables before importing s3proxy modules os.environ.setdefault("S3PROXY_HOST", "http://localhost:9000") +# Rejection tests fill the budget then wait out the backpressure timeout before +# asserting SlowDown -- keep it short so the unit suite doesn't burn 30s each. +os.environ.setdefault("S3PROXY_BACKPRESSURE_TIMEOUT", "1") from s3proxy.client import S3Client, S3Credentials from s3proxy.config import Settings diff --git a/tests/unit/test_memory_concurrency.py b/tests/unit/test_memory_concurrency.py index 50ef560..e7ae48c 100644 --- a/tests/unit/test_memory_concurrency.py +++ b/tests/unit/test_memory_concurrency.py @@ -287,24 +287,22 @@ async def test_mixed_workload_scenario(self): used = es assert concurrency_module.get_active_memory() == used - # Fill the rest of the budget with small files (MIN_RESERVATION each) + # Small metadata files share the remaining budget. The remaining 32MB fits + # hundreds of MIN_RESERVATION files; acquire a handful to confirm they + # coexist with the ES part (full saturation is covered elsewhere). remaining_budget = limit - used - files_that_fit = remaining_budget // concurrency_module.MIN_RESERVATION - - small_reservations = [] - for _ in range(files_that_fit): + assert remaining_budget // concurrency_module.MIN_RESERVATION >= 8 + for _ in range(8): footprint = concurrency_module.estimate_memory_footprint("PUT", 1024) - reserved = await concurrency_module.try_acquire_memory(footprint) - small_reservations.append(reserved) + assert footprint == concurrency_module.MIN_RESERVATION + reservations.append(await concurrency_module.try_acquire_memory(footprint)) - expected_total = used + files_that_fit * concurrency_module.MIN_RESERVATION - assert concurrency_module.get_active_memory() == expected_total - # Budget is full to within one MIN_RESERVATION -- the next request can't fit - # (the limiter would back-pressure then reject it). - assert limit - expected_total < concurrency_module.MIN_RESERVATION + assert ( + concurrency_module.get_active_memory() == used + 8 * concurrency_module.MIN_RESERVATION + ) # Clean up - for r in reservations + small_reservations: + for r in reservations: await concurrency_module.release_memory(r) assert concurrency_module.get_active_memory() == 0 From 28a7f9aba390cf9368ffb07bc8ed72119e37687a Mon Sep 17 00:00:00 2001 From: serversidehannes Date: Mon, 22 Jun 2026 12:51:05 +0200 Subject: [PATCH 4/5] test: reject immediately in unit tests instead of env-hacking the timeout Replace the S3PROXY_BACKPRESSURE_TIMEOUT=1 env in shared conftest (fragile: read once at import, still slept 1s/test) with a unit-only autouse fixture that patches concurrency.BACKPRESSURE_TIMEOUT=0 at runtime. Rejection tests fill the budget permanently, so they reject with zero wall-clock wait; no import-order dependency and no effect on integration subprocesses. --- tests/conftest.py | 3 --- tests/unit/conftest.py | 22 ++++++++++++++++++++++ 2 files changed, 22 insertions(+), 3 deletions(-) create mode 100644 tests/unit/conftest.py diff --git a/tests/conftest.py b/tests/conftest.py index dd9547d..027482a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -12,9 +12,6 @@ # Set environment variables before importing s3proxy modules os.environ.setdefault("S3PROXY_HOST", "http://localhost:9000") -# Rejection tests fill the budget then wait out the backpressure timeout before -# asserting SlowDown -- keep it short so the unit suite doesn't burn 30s each. -os.environ.setdefault("S3PROXY_BACKPRESSURE_TIMEOUT", "1") from s3proxy.client import S3Client, S3Credentials from s3proxy.config import Settings diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py new file mode 100644 index 0000000..e4ce68b --- /dev/null +++ b/tests/unit/conftest.py @@ -0,0 +1,22 @@ +"""Unit-test fixtures.""" + +import pytest + +import s3proxy.concurrency as concurrency + + +@pytest.fixture(autouse=True) +def no_backpressure_wait(): + """Reject immediately instead of waiting out the backpressure timeout. + + The rejection tests fill the budget to capacity and then try to acquire more, + which can never succeed -- with the production timeout each such test sleeps + the full BACKPRESSURE_TIMEOUT before asserting SlowDown. None of the in-process + unit tests exercise wait-then-succeed backpressure, so a 0 timeout gives the + same SlowDown with no wall-clock wait. (Patched at runtime, not via env, so it + doesn't depend on import order and never leaks past a test.) + """ + original = concurrency.BACKPRESSURE_TIMEOUT + concurrency.BACKPRESSURE_TIMEOUT = 0 + yield + concurrency.BACKPRESSURE_TIMEOUT = original From 46bbd53b21982f7fbdbae8091089d72f86678729 Mon Sep 17 00:00:00 2001 From: serversidehannes Date: Mon, 22 Jun 2026 13:11:44 +0200 Subject: [PATCH 5/5] fix: clamp oversized reservation to budget instead of hard-rejecting A single request's honest framed-upload peak can exceed the whole governor budget when the budget is deliberately tight (the 16MB integration OOM test streams 30MB uploads). Hard-rejecting refused requests the proxy can stream fine. Clamp to the budget so such a request runs exclusively (concurrency 1) -- the budget bounds concurrency, not one request's peak. Verified against real minio: 20MB uploads succeed at a 16MB budget via backpressure. --- s3proxy/concurrency.py | 24 +++++++++++++----------- tests/unit/test_memory_concurrency.py | 20 ++++++++++++-------- 2 files changed, 25 insertions(+), 19 deletions(-) diff --git a/s3proxy/concurrency.py b/s3proxy/concurrency.py index 0500699..2fd2486 100644 --- a/s3proxy/concurrency.py +++ b/s3proxy/concurrency.py @@ -87,19 +87,21 @@ async def try_acquire(self, bytes_needed: int) -> int: to_reserve = max(MIN_RESERVATION, bytes_needed) - # Single request exceeds entire budget — can never fit, reject immediately + # A single request's honest peak can exceed the whole budget: the framed + # upload path needs ~2-3x the internal part, more than a deliberately tight + # governor budget (which sits well below pod RAM to bound *concurrency*). + # Clamp to the budget so such a request runs exclusively (concurrency 1) + # rather than being refused outright -- the proxy streams it fine, it just + # can't share the budget. ponytail: a request whose true peak exceeds the + # pod's RAM (not just this budget) could still OOM when run alone; all + # realistic parts (<=~56MB) stay far under that. if to_reserve > self._limit_bytes: - request_mb = to_reserve / 1024 / 1024 - limit_mb = self._limit_bytes / 1024 / 1024 - logger.warning( - "MEMORY_TOO_LARGE", - requested_mb=round(request_mb, 2), - limit_mb=round(limit_mb, 2), - ) - MEMORY_REJECTIONS.inc() - raise S3Error.slow_down( - f"Request needs {request_mb:.0f}MB but budget is {limit_mb:.0f}MB" + logger.info( + "MEMORY_CLAMPED_TO_BUDGET", + requested_mb=round(to_reserve / 1024 / 1024, 2), + limit_mb=round(self._limit_bytes / 1024 / 1024, 2), ) + to_reserve = self._limit_bytes async with self._condition: deadline = asyncio.get_event_loop().time() + BACKPRESSURE_TIMEOUT diff --git a/tests/unit/test_memory_concurrency.py b/tests/unit/test_memory_concurrency.py index e7ae48c..967e488 100644 --- a/tests/unit/test_memory_concurrency.py +++ b/tests/unit/test_memory_concurrency.py @@ -155,16 +155,20 @@ async def test_memory_released_on_completion(self): assert concurrency_module.get_active_memory() == 0 @pytest.mark.asyncio - async def test_single_request_cannot_exceed_budget(self): - """A single 100MB request should be rejected when it exceeds the 64MB budget.""" + async def test_single_request_larger_than_budget_runs_exclusively(self): + """A request whose footprint exceeds the whole budget is NOT rejected -- it + is clamped to the budget and runs exclusively (concurrency 1). The proxy + streams it fine; the budget bounds concurrency, not one request's peak. + (Rejecting it would refuse large uploads under a tight budget -- the + integration OOM test uploads 30MB at a 16MB budget and must succeed.) + """ import s3proxy.concurrency as concurrency_module - from s3proxy.errors import S3Error - - # Request 100MB — exceeds 64MB limit, should be rejected immediately - with pytest.raises(S3Error, match="503"): - await concurrency_module.try_acquire_memory(100 * 1024 * 1024) - # No memory should be reserved after rejection + limit = concurrency_module.get_memory_limit() + reserved = await concurrency_module.try_acquire_memory(100 * 1024 * 1024) + assert reserved == limit # clamped to the whole budget + assert concurrency_module.get_active_memory() == limit + await concurrency_module.release_memory(reserved) assert concurrency_module.get_active_memory() == 0 @pytest.mark.asyncio