diff --git a/s3proxy/concurrency.py b/s3proxy/concurrency.py index 2fd2486..7b4d4be 100644 --- a/s3proxy/concurrency.py +++ b/s3proxy/concurrency.py @@ -36,7 +36,7 @@ def _create_malloc_release() -> Callable[[], int] | None: libc.malloc_trim.argtypes = [ctypes.c_size_t] libc.malloc_trim.restype = ctypes.c_int return lambda: libc.malloc_trim(0) - except OSError, AttributeError: + except (OSError, AttributeError): return None diff --git a/s3proxy/dashboard/collectors.py b/s3proxy/dashboard/collectors.py index afad1e4..79f968e 100644 --- a/s3proxy/dashboard/collectors.py +++ b/s3proxy/dashboard/collectors.py @@ -134,7 +134,7 @@ def _latency_percentiles(cumulative: dict[str, float] | None = None) -> dict[str continue try: buckets.append((float(le), float(count))) - except ValueError, TypeError: + except (ValueError, TypeError): continue if total < 1 and buckets: total = max(c for _, c in buckets) diff --git a/s3proxy/dashboard/stats_store.py b/s3proxy/dashboard/stats_store.py index d581b5e..8f21391 100644 --- a/s3proxy/dashboard/stats_store.py +++ b/s3proxy/dashboard/stats_store.py @@ -561,7 +561,7 @@ async def series(self, metric: str, range_key: str) -> tuple[list[float], list[f for k, v in raw.items(): try: points.append((int(k), float(v))) - except ValueError, TypeError: + except (ValueError, TypeError): continue return bucket_series(points, window, bucket) @@ -642,7 +642,7 @@ def _hfloat(h: dict, field: bytes) -> float: return 0.0 try: return float(v) - except ValueError, TypeError: + except (ValueError, TypeError): return 0.0 @@ -652,7 +652,7 @@ def _decode_float_map(h: dict) -> dict[str, float]: key = k.decode() if isinstance(k, bytes) else str(k) try: out[key] = float(v) - except ValueError, TypeError: + except (ValueError, TypeError): continue return out @@ -665,7 +665,7 @@ def _loads_sample(raw: bytes) -> RequestSample | None: try: d = orjson.loads(raw) return RequestSample(**d) - except ValueError, TypeError: + except (ValueError, TypeError): return None diff --git a/s3proxy/handlers/buckets.py b/s3proxy/handlers/buckets.py index 41d7647..fef9b07 100644 --- a/s3proxy/handlers/buckets.py +++ b/s3proxy/handlers/buckets.py @@ -20,6 +20,9 @@ logger: BoundLogger = structlog.get_logger() +# Max concurrent HEAD requests when resolving plaintext sizes for a list page. +LIST_HEAD_CONCURRENCY = 50 + def _strip_minio_cache_suffix(value: str | None) -> str | None: """Strip MinIO cache metadata suffix from marker/token values. @@ -170,28 +173,32 @@ def _is_internal_key(self, key: str) -> bool: ) async def _process_list_objects(self, client, bucket: str, contents: list[dict]) -> list[dict]: - objects = [] - for obj in contents: - if self._is_internal_key(obj["Key"]): - continue - try: - head = await client.head_object(bucket, obj["Key"]) - meta = head.get("Metadata", {}) - size = self._get_plaintext_size(meta, obj.get("Size", 0)) - etag = self._get_effective_etag(meta, obj.get("ETag", "")) - except Exception: - size, etag = obj.get("Size", 0), obj.get("ETag", "").strip('"') - - objects.append( - { - "key": obj["Key"], - "last_modified": obj["LastModified"].isoformat(), - "etag": etag, - "size": size, - "storage_class": obj.get("StorageClass", "STANDARD"), - } - ) - return objects + # One HEAD per object is needed to recover the SSE plaintext size/etag. + # Run them concurrently (bounded) — sequential HEADs on a recursive list + # of up to max-keys objects stack into a multi-second stall that trips + # client timeouts. ponytail: fixed fan-out cap; raise LIST_HEAD_CONCURRENCY + # if backend HEAD latency dominates large pages. + listable = [obj for obj in contents if not self._is_internal_key(obj["Key"])] + sem = asyncio.Semaphore(LIST_HEAD_CONCURRENCY) + + async def resolve(obj: dict) -> dict: + async with sem: + try: + head = await client.head_object(bucket, obj["Key"]) + meta = head.get("Metadata", {}) + size = self._get_plaintext_size(meta, obj.get("Size", 0)) + etag = self._get_effective_etag(meta, obj.get("ETag", "")) + except Exception: + size, etag = obj.get("Size", 0), obj.get("ETag", "").strip('"') + return { + "key": obj["Key"], + "last_modified": obj["LastModified"].isoformat(), + "etag": etag, + "size": size, + "storage_class": obj.get("StorageClass", "STANDARD"), + } + + return await asyncio.gather(*[resolve(obj) for obj in listable]) async def handle_create_bucket(self, request: Request, creds: S3Credentials) -> Response: bucket = self._parse_bucket(request.url.path) diff --git a/s3proxy/request_handler.py b/s3proxy/request_handler.py index 5ac7388..12042ff 100644 --- a/s3proxy/request_handler.py +++ b/s3proxy/request_handler.py @@ -214,7 +214,7 @@ async def _handle_proxy_request_impl( dispatcher = RequestDispatcher(handler) try: return await dispatcher.dispatch(request, verified_creds) - except HTTPException, S3Error: + except (HTTPException, S3Error): raise except UnknownKidError as e: logger.warning("Cannot decrypt object: key not configured", kid=e.kid) diff --git a/s3proxy/utils.py b/s3proxy/utils.py index c758f6a..12be211 100644 --- a/s3proxy/utils.py +++ b/s3proxy/utils.py @@ -17,7 +17,7 @@ def parse_http_date(date_str: str | None) -> datetime | None: return None try: return parsedate_to_datetime(date_str) - except ValueError, TypeError: + except (ValueError, TypeError): return None diff --git a/tests/unit/test_list_objects_parallel.py b/tests/unit/test_list_objects_parallel.py new file mode 100644 index 0000000..e649141 --- /dev/null +++ b/tests/unit/test_list_objects_parallel.py @@ -0,0 +1,87 @@ +"""Self-check for the parallel HEAD fan-out in BucketHandlerMixin._process_list_objects. + +Sequential HEADs on a recursive list stack into a client-timeout-tripping stall. +This proves: HEADs run concurrently, output order matches input order, internal +keys are skipped, and a failing HEAD falls back to the listed size/etag. +""" + +import asyncio +import datetime as dt + +from s3proxy.handlers.buckets import LIST_HEAD_CONCURRENCY, BucketHandlerMixin + +INTERNAL_PREFIX = "s3proxy-internal/" + + +class FakeHandler: + _process_list_objects = BucketHandlerMixin._process_list_objects + + def __init__(self): + self.inflight = 0 + self.max_inflight = 0 + + def _is_internal_key(self, key): + return key.startswith(INTERNAL_PREFIX) + + def _get_plaintext_size(self, meta, fallback): + return int(meta.get("plaintext-size", fallback)) + + def _get_effective_etag(self, meta, fallback): + return meta.get("client-etag", fallback.strip('"')) + + +class FakeClient: + def __init__(self, handler, fail_key=None): + self.handler = handler + self.fail_key = fail_key + + async def head_object(self, bucket, key): + self.handler.inflight += 1 + self.handler.max_inflight = max(self.handler.max_inflight, self.handler.inflight) + try: + await asyncio.sleep(0.02) # simulate backend round-trip + if key == self.fail_key: + raise RuntimeError("backend HEAD failed") + return {"Metadata": {"plaintext-size": "111", "client-etag": f"etag-{key}"}} + finally: + self.handler.inflight -= 1 + + +def _obj(key, size=999): + return { + "Key": key, + "Size": size, + "ETag": '"raw-etag"', + "LastModified": dt.datetime(2026, 6, 24, 9, 0, 0), + "StorageClass": "STANDARD", + } + + +def test_parallel_order_and_fallback(): + handler = FakeHandler() + client = FakeClient(handler, fail_key="b") + contents = [ + _obj("a"), + _obj(f"{INTERNAL_PREFIX}skip-me"), # internal -> dropped + _obj("b", size=42), # HEAD fails -> fallback to listed size/etag + _obj("c"), + ] + + result = asyncio.run(handler._process_list_objects(client, "bucket", contents)) + + # Internal key dropped, order preserved. + assert [o["key"] for o in result] == ["a", "b", "c"] + # Successful HEAD -> plaintext size + client-etag. + assert result[0]["size"] == 111 + assert result[0]["etag"] == "etag-a" + # Failed HEAD -> fallback to listed size + stripped raw etag. + assert result[1]["size"] == 42 + assert result[1]["etag"] == "raw-etag" + # HEADs actually ran concurrently (would be 1 if sequential), and stayed bounded. + assert handler.max_inflight > 1 + assert handler.max_inflight <= LIST_HEAD_CONCURRENCY + + +if __name__ == "__main__": + test_parallel_order_and_fallback() + print("ok")