From ceefc474d0236102a44adc5c38a289ce8daa3dbe Mon Sep 17 00:00:00 2001 From: serversidehannes Date: Wed, 24 Jun 2026 11:27:37 +0200 Subject: [PATCH 1/2] fix: parallelize per-object HEAD on list-objects to stop recursive-list stalls ListObjects resolved the SSE plaintext size/etag with one sequential head_object per key. A recursive (empty-delimiter) list of up to max-keys objects stacked into a multi-second stall that tripped client timeouts, hanging ClickHouse/Postgres remote backups at the S3 list step. Run the HEADs concurrently, bounded by LIST_HEAD_CONCURRENCY (50), preserving output order and the per-object fallback to the listed size/etag. --- s3proxy/handlers/buckets.py | 51 +++++---- tests/unit/test_list_objects_parallel.py | 128 +++++++++++++++++++++++ 2 files changed, 157 insertions(+), 22 deletions(-) create mode 100644 tests/unit/test_list_objects_parallel.py diff --git a/s3proxy/handlers/buckets.py b/s3proxy/handlers/buckets.py index 41d7647..fef9b07 100644 --- a/s3proxy/handlers/buckets.py +++ b/s3proxy/handlers/buckets.py @@ -20,6 +20,9 @@ logger: BoundLogger = structlog.get_logger() +# Max concurrent HEAD requests when resolving plaintext sizes for a list page. +LIST_HEAD_CONCURRENCY = 50 + def _strip_minio_cache_suffix(value: str | None) -> str | None: """Strip MinIO cache metadata suffix from marker/token values. @@ -170,28 +173,32 @@ def _is_internal_key(self, key: str) -> bool: ) async def _process_list_objects(self, client, bucket: str, contents: list[dict]) -> list[dict]: - objects = [] - for obj in contents: - if self._is_internal_key(obj["Key"]): - continue - try: - head = await client.head_object(bucket, obj["Key"]) - meta = head.get("Metadata", {}) - size = self._get_plaintext_size(meta, obj.get("Size", 0)) - etag = self._get_effective_etag(meta, obj.get("ETag", "")) - except Exception: - size, etag = obj.get("Size", 0), obj.get("ETag", "").strip('"') - - objects.append( - { - "key": obj["Key"], - "last_modified": obj["LastModified"].isoformat(), - "etag": etag, - "size": size, - "storage_class": obj.get("StorageClass", "STANDARD"), - } - ) - return objects + # One HEAD per object is needed to recover the SSE plaintext size/etag. + # Run them concurrently (bounded) — sequential HEADs on a recursive list + # of up to max-keys objects stack into a multi-second stall that trips + # client timeouts. ponytail: fixed fan-out cap; raise LIST_HEAD_CONCURRENCY + # if backend HEAD latency dominates large pages. + listable = [obj for obj in contents if not self._is_internal_key(obj["Key"])] + sem = asyncio.Semaphore(LIST_HEAD_CONCURRENCY) + + async def resolve(obj: dict) -> dict: + async with sem: + try: + head = await client.head_object(bucket, obj["Key"]) + meta = head.get("Metadata", {}) + size = self._get_plaintext_size(meta, obj.get("Size", 0)) + etag = self._get_effective_etag(meta, obj.get("ETag", "")) + except Exception: + size, etag = obj.get("Size", 0), obj.get("ETag", "").strip('"') + return { + "key": obj["Key"], + "last_modified": obj["LastModified"].isoformat(), + "etag": etag, + "size": size, + "storage_class": obj.get("StorageClass", "STANDARD"), + } + + return await asyncio.gather(*[resolve(obj) for obj in listable]) async def handle_create_bucket(self, request: Request, creds: S3Credentials) -> Response: bucket = self._parse_bucket(request.url.path) diff --git a/tests/unit/test_list_objects_parallel.py b/tests/unit/test_list_objects_parallel.py new file mode 100644 index 0000000..af06c75 --- /dev/null +++ b/tests/unit/test_list_objects_parallel.py @@ -0,0 +1,128 @@ +"""Self-check for the parallel HEAD fan-out in BucketHandlerMixin._process_list_objects. + +Sequential HEADs on a recursive list stack into a client-timeout-tripping stall. +This proves: HEADs run concurrently, output order matches input order, internal +keys are skipped, and a failing HEAD falls back to the listed size/etag. + +The repo's package __init__ chain currently pulls in modules with pre-existing +Py2 `except A, B:` syntax (utils.py, dashboard/*) that won't import under Py3, +so we load buckets.py directly with stubbed siblings to exercise the real code. +""" + +import asyncio +import datetime as dt +import importlib.util +import sys +import types +from pathlib import Path + +REPO = Path(__file__).resolve().parents[2] + + +def _load_buckets(): + def stub(name, **attrs): + m = types.ModuleType(name) + for k, v in attrs.items(): + setattr(m, k, v) + sys.modules[name] = m + return m + + stub("s3proxy") + stub("s3proxy.handlers") + stub("s3proxy.xml_responses") + stub("s3proxy.client", S3Credentials=object) + stub("s3proxy.errors", S3Error=type("S3Error", (Exception,), {})) + stub( + "s3proxy.state", + INTERNAL_PREFIX="s3proxy-internal/", + META_SUFFIX_LEGACY=".s3proxy-meta", + delete_multipart_metadata=lambda *a, **k: None, + ) + stub("s3proxy.xml_utils", find_element=lambda *a, **k: None, find_elements=lambda *a, **k: []) + stub("s3proxy.handlers.base", BaseHandler=object) + + spec = importlib.util.spec_from_file_location( + "s3proxy.handlers.buckets", REPO / "s3proxy" / "handlers" / "buckets.py" + ) + mod = importlib.util.module_from_spec(spec) + sys.modules["s3proxy.handlers.buckets"] = mod + spec.loader.exec_module(mod) + return mod + + +buckets = _load_buckets() +INTERNAL_PREFIX = "s3proxy-internal/" + + +class FakeHandler: + _process_list_objects = buckets.BucketHandlerMixin._process_list_objects + + def __init__(self): + self.inflight = 0 + self.max_inflight = 0 + + def _is_internal_key(self, key): + return key.startswith(INTERNAL_PREFIX) + + def _get_plaintext_size(self, meta, fallback): + return int(meta.get("plaintext-size", fallback)) + + def _get_effective_etag(self, meta, fallback): + return meta.get("client-etag", fallback.strip('"')) + + +class FakeClient: + def __init__(self, handler, fail_key=None): + self.handler = handler + self.fail_key = fail_key + + async def head_object(self, bucket, key): + self.handler.inflight += 1 + self.handler.max_inflight = max(self.handler.max_inflight, self.handler.inflight) + try: + await asyncio.sleep(0.02) # simulate backend round-trip + if key == self.fail_key: + raise RuntimeError("backend HEAD failed") + return {"Metadata": {"plaintext-size": "111", "client-etag": f"etag-{key}"}} + finally: + self.handler.inflight -= 1 + + +def _obj(key, size=999): + return { + "Key": key, + "Size": size, + "ETag": '"raw-etag"', + "LastModified": dt.datetime(2026, 6, 24, 9, 0, 0), + "StorageClass": "STANDARD", + } + + +def test_parallel_order_and_fallback(): + handler = FakeHandler() + client = FakeClient(handler, fail_key="b") + contents = [ + _obj("a"), + _obj(f"{INTERNAL_PREFIX}skip-me"), # internal -> dropped + _obj("b", size=42), # HEAD fails -> fallback to listed size/etag + _obj("c"), + ] + + result = asyncio.run(handler._process_list_objects(client, "bucket", contents)) + + # Internal key dropped, order preserved. + assert [o["key"] for o in result] == ["a", "b", "c"] + # Successful HEAD -> plaintext size + client-etag. + assert result[0]["size"] == 111 + assert result[0]["etag"] == "etag-a" + # Failed HEAD -> fallback to listed size + stripped raw etag. + assert result[1]["size"] == 42 + assert result[1]["etag"] == "raw-etag" + # HEADs actually ran concurrently (would be 1 if sequential), and stayed bounded. + assert handler.max_inflight > 1 + assert handler.max_inflight <= buckets.LIST_HEAD_CONCURRENCY + + +if __name__ == "__main__": + test_parallel_order_and_fallback() + print("ok") From 0ca5ecc4d1928da46aa6fd610ed31c3025f74fc9 Mon Sep 17 00:00:00 2001 From: serversidehannes Date: Wed, 24 Jun 2026 11:51:04 +0200 Subject: [PATCH 2/2] fix: parenthesize Py2 multi-exception except clauses so the package imports MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit except A, B: is Py2 syntax (catch A, bind to name B) and fails to parse under Py3 — it broke 'import s3proxy', so the whole pytest suite couldn't collect (conftest imports s3proxy.concurrency). Parenthesize the 8 sites across utils, concurrency, request_handler and dashboard/* to catch both types as intended. Lets the list-objects self-check import the real module directly (drops the sibling-stubbing workaround). --- s3proxy/concurrency.py | 2 +- s3proxy/dashboard/collectors.py | 2 +- s3proxy/dashboard/stats_store.py | 8 ++-- s3proxy/request_handler.py | 2 +- s3proxy/utils.py | 2 +- tests/unit/test_list_objects_parallel.py | 51 +++--------------------- 6 files changed, 13 insertions(+), 54 deletions(-) diff --git a/s3proxy/concurrency.py b/s3proxy/concurrency.py index 2fd2486..7b4d4be 100644 --- a/s3proxy/concurrency.py +++ b/s3proxy/concurrency.py @@ -36,7 +36,7 @@ def _create_malloc_release() -> Callable[[], int] | None: libc.malloc_trim.argtypes = [ctypes.c_size_t] libc.malloc_trim.restype = ctypes.c_int return lambda: libc.malloc_trim(0) - except OSError, AttributeError: + except (OSError, AttributeError): return None diff --git a/s3proxy/dashboard/collectors.py b/s3proxy/dashboard/collectors.py index afad1e4..79f968e 100644 --- a/s3proxy/dashboard/collectors.py +++ b/s3proxy/dashboard/collectors.py @@ -134,7 +134,7 @@ def _latency_percentiles(cumulative: dict[str, float] | None = None) -> dict[str continue try: buckets.append((float(le), float(count))) - except ValueError, TypeError: + except (ValueError, TypeError): continue if total < 1 and buckets: total = max(c for _, c in buckets) diff --git a/s3proxy/dashboard/stats_store.py b/s3proxy/dashboard/stats_store.py index d581b5e..8f21391 100644 --- a/s3proxy/dashboard/stats_store.py +++ b/s3proxy/dashboard/stats_store.py @@ -561,7 +561,7 @@ async def series(self, metric: str, range_key: str) -> tuple[list[float], list[f for k, v in raw.items(): try: points.append((int(k), float(v))) - except ValueError, TypeError: + except (ValueError, TypeError): continue return bucket_series(points, window, bucket) @@ -642,7 +642,7 @@ def _hfloat(h: dict, field: bytes) -> float: return 0.0 try: return float(v) - except ValueError, TypeError: + except (ValueError, TypeError): return 0.0 @@ -652,7 +652,7 @@ def _decode_float_map(h: dict) -> dict[str, float]: key = k.decode() if isinstance(k, bytes) else str(k) try: out[key] = float(v) - except ValueError, TypeError: + except (ValueError, TypeError): continue return out @@ -665,7 +665,7 @@ def _loads_sample(raw: bytes) -> RequestSample | None: try: d = orjson.loads(raw) return RequestSample(**d) - except ValueError, TypeError: + except (ValueError, TypeError): return None diff --git a/s3proxy/request_handler.py b/s3proxy/request_handler.py index 5ac7388..12042ff 100644 --- a/s3proxy/request_handler.py +++ b/s3proxy/request_handler.py @@ -214,7 +214,7 @@ async def _handle_proxy_request_impl( dispatcher = RequestDispatcher(handler) try: return await dispatcher.dispatch(request, verified_creds) - except HTTPException, S3Error: + except (HTTPException, S3Error): raise except UnknownKidError as e: logger.warning("Cannot decrypt object: key not configured", kid=e.kid) diff --git a/s3proxy/utils.py b/s3proxy/utils.py index c758f6a..12be211 100644 --- a/s3proxy/utils.py +++ b/s3proxy/utils.py @@ -17,7 +17,7 @@ def parse_http_date(date_str: str | None) -> datetime | None: return None try: return parsedate_to_datetime(date_str) - except ValueError, TypeError: + except (ValueError, TypeError): return None diff --git a/tests/unit/test_list_objects_parallel.py b/tests/unit/test_list_objects_parallel.py index af06c75..e649141 100644 --- a/tests/unit/test_list_objects_parallel.py +++ b/tests/unit/test_list_objects_parallel.py @@ -3,59 +3,18 @@ Sequential HEADs on a recursive list stack into a client-timeout-tripping stall. This proves: HEADs run concurrently, output order matches input order, internal keys are skipped, and a failing HEAD falls back to the listed size/etag. - -The repo's package __init__ chain currently pulls in modules with pre-existing -Py2 `except A, B:` syntax (utils.py, dashboard/*) that won't import under Py3, -so we load buckets.py directly with stubbed siblings to exercise the real code. """ import asyncio import datetime as dt -import importlib.util -import sys -import types -from pathlib import Path - -REPO = Path(__file__).resolve().parents[2] - - -def _load_buckets(): - def stub(name, **attrs): - m = types.ModuleType(name) - for k, v in attrs.items(): - setattr(m, k, v) - sys.modules[name] = m - return m - - stub("s3proxy") - stub("s3proxy.handlers") - stub("s3proxy.xml_responses") - stub("s3proxy.client", S3Credentials=object) - stub("s3proxy.errors", S3Error=type("S3Error", (Exception,), {})) - stub( - "s3proxy.state", - INTERNAL_PREFIX="s3proxy-internal/", - META_SUFFIX_LEGACY=".s3proxy-meta", - delete_multipart_metadata=lambda *a, **k: None, - ) - stub("s3proxy.xml_utils", find_element=lambda *a, **k: None, find_elements=lambda *a, **k: []) - stub("s3proxy.handlers.base", BaseHandler=object) - - spec = importlib.util.spec_from_file_location( - "s3proxy.handlers.buckets", REPO / "s3proxy" / "handlers" / "buckets.py" - ) - mod = importlib.util.module_from_spec(spec) - sys.modules["s3proxy.handlers.buckets"] = mod - spec.loader.exec_module(mod) - return mod - - -buckets = _load_buckets() + +from s3proxy.handlers.buckets import LIST_HEAD_CONCURRENCY, BucketHandlerMixin + INTERNAL_PREFIX = "s3proxy-internal/" class FakeHandler: - _process_list_objects = buckets.BucketHandlerMixin._process_list_objects + _process_list_objects = BucketHandlerMixin._process_list_objects def __init__(self): self.inflight = 0 @@ -120,7 +79,7 @@ def test_parallel_order_and_fallback(): assert result[1]["etag"] == "raw-etag" # HEADs actually ran concurrently (would be 1 if sequential), and stayed bounded. assert handler.max_inflight > 1 - assert handler.max_inflight <= buckets.LIST_HEAD_CONCURRENCY + assert handler.max_inflight <= LIST_HEAD_CONCURRENCY if __name__ == "__main__":