Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion s3proxy/concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def _create_malloc_release() -> Callable[[], int] | None:
libc.malloc_trim.argtypes = [ctypes.c_size_t]
libc.malloc_trim.restype = ctypes.c_int
return lambda: libc.malloc_trim(0)
except OSError, AttributeError:
except (OSError, AttributeError):
return None


Expand Down
2 changes: 1 addition & 1 deletion s3proxy/dashboard/collectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def _latency_percentiles(cumulative: dict[str, float] | None = None) -> dict[str
continue
try:
buckets.append((float(le), float(count)))
except ValueError, TypeError:
except (ValueError, TypeError):
continue
if total < 1 and buckets:
total = max(c for _, c in buckets)
Expand Down
8 changes: 4 additions & 4 deletions s3proxy/dashboard/stats_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ async def series(self, metric: str, range_key: str) -> tuple[list[float], list[f
for k, v in raw.items():
try:
points.append((int(k), float(v)))
except ValueError, TypeError:
except (ValueError, TypeError):
continue
return bucket_series(points, window, bucket)

Expand Down Expand Up @@ -642,7 +642,7 @@ def _hfloat(h: dict, field: bytes) -> float:
return 0.0
try:
return float(v)
except ValueError, TypeError:
except (ValueError, TypeError):
return 0.0


Expand All @@ -652,7 +652,7 @@ def _decode_float_map(h: dict) -> dict[str, float]:
key = k.decode() if isinstance(k, bytes) else str(k)
try:
out[key] = float(v)
except ValueError, TypeError:
except (ValueError, TypeError):
continue
return out

Expand All @@ -665,7 +665,7 @@ def _loads_sample(raw: bytes) -> RequestSample | None:
try:
d = orjson.loads(raw)
return RequestSample(**d)
except ValueError, TypeError:
except (ValueError, TypeError):
return None


Expand Down
51 changes: 29 additions & 22 deletions s3proxy/handlers/buckets.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

logger: BoundLogger = structlog.get_logger()

# Max concurrent HEAD requests when resolving plaintext sizes for a list page.
LIST_HEAD_CONCURRENCY = 50


def _strip_minio_cache_suffix(value: str | None) -> str | None:
"""Strip MinIO cache metadata suffix from marker/token values.
Expand Down Expand Up @@ -170,28 +173,32 @@ def _is_internal_key(self, key: str) -> bool:
)

async def _process_list_objects(self, client, bucket: str, contents: list[dict]) -> list[dict]:
objects = []
for obj in contents:
if self._is_internal_key(obj["Key"]):
continue
try:
head = await client.head_object(bucket, obj["Key"])
meta = head.get("Metadata", {})
size = self._get_plaintext_size(meta, obj.get("Size", 0))
etag = self._get_effective_etag(meta, obj.get("ETag", ""))
except Exception:
size, etag = obj.get("Size", 0), obj.get("ETag", "").strip('"')

objects.append(
{
"key": obj["Key"],
"last_modified": obj["LastModified"].isoformat(),
"etag": etag,
"size": size,
"storage_class": obj.get("StorageClass", "STANDARD"),
}
)
return objects
# One HEAD per object is needed to recover the SSE plaintext size/etag.
# Run them concurrently (bounded) — sequential HEADs on a recursive list
# of up to max-keys objects stack into a multi-second stall that trips
# client timeouts. ponytail: fixed fan-out cap; raise LIST_HEAD_CONCURRENCY
# if backend HEAD latency dominates large pages.
listable = [obj for obj in contents if not self._is_internal_key(obj["Key"])]
sem = asyncio.Semaphore(LIST_HEAD_CONCURRENCY)

async def resolve(obj: dict) -> dict:
async with sem:
try:
head = await client.head_object(bucket, obj["Key"])
meta = head.get("Metadata", {})
size = self._get_plaintext_size(meta, obj.get("Size", 0))
etag = self._get_effective_etag(meta, obj.get("ETag", ""))
except Exception:
size, etag = obj.get("Size", 0), obj.get("ETag", "").strip('"')
return {
"key": obj["Key"],
"last_modified": obj["LastModified"].isoformat(),
"etag": etag,
"size": size,
"storage_class": obj.get("StorageClass", "STANDARD"),
}

return await asyncio.gather(*[resolve(obj) for obj in listable])

async def handle_create_bucket(self, request: Request, creds: S3Credentials) -> Response:
bucket = self._parse_bucket(request.url.path)
Expand Down
2 changes: 1 addition & 1 deletion s3proxy/request_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ async def _handle_proxy_request_impl(
dispatcher = RequestDispatcher(handler)
try:
return await dispatcher.dispatch(request, verified_creds)
except HTTPException, S3Error:
except (HTTPException, S3Error):
raise
except UnknownKidError as e:
logger.warning("Cannot decrypt object: key not configured", kid=e.kid)
Expand Down
2 changes: 1 addition & 1 deletion s3proxy/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def parse_http_date(date_str: str | None) -> datetime | None:
return None
try:
return parsedate_to_datetime(date_str)
except ValueError, TypeError:
except (ValueError, TypeError):
return None


Expand Down
87 changes: 87 additions & 0 deletions tests/unit/test_list_objects_parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""Self-check for the parallel HEAD fan-out in BucketHandlerMixin._process_list_objects.

Sequential HEADs on a recursive list stack into a client-timeout-tripping stall.
This proves: HEADs run concurrently, output order matches input order, internal
keys are skipped, and a failing HEAD falls back to the listed size/etag.
"""

import asyncio
import datetime as dt

from s3proxy.handlers.buckets import LIST_HEAD_CONCURRENCY, BucketHandlerMixin

INTERNAL_PREFIX = "s3proxy-internal/"


class FakeHandler:
_process_list_objects = BucketHandlerMixin._process_list_objects

def __init__(self):
self.inflight = 0
self.max_inflight = 0

def _is_internal_key(self, key):
return key.startswith(INTERNAL_PREFIX)

def _get_plaintext_size(self, meta, fallback):
return int(meta.get("plaintext-size", fallback))

def _get_effective_etag(self, meta, fallback):
return meta.get("client-etag", fallback.strip('"'))


class FakeClient:
def __init__(self, handler, fail_key=None):
self.handler = handler
self.fail_key = fail_key

async def head_object(self, bucket, key):
self.handler.inflight += 1
self.handler.max_inflight = max(self.handler.max_inflight, self.handler.inflight)
try:
await asyncio.sleep(0.02) # simulate backend round-trip
if key == self.fail_key:
raise RuntimeError("backend HEAD failed")
return {"Metadata": {"plaintext-size": "111", "client-etag": f"etag-{key}"}}
finally:
self.handler.inflight -= 1


def _obj(key, size=999):
return {
"Key": key,
"Size": size,
"ETag": '"raw-etag"',
"LastModified": dt.datetime(2026, 6, 24, 9, 0, 0),
"StorageClass": "STANDARD",
}


def test_parallel_order_and_fallback():
handler = FakeHandler()
client = FakeClient(handler, fail_key="b")
contents = [
_obj("a"),
_obj(f"{INTERNAL_PREFIX}skip-me"), # internal -> dropped
_obj("b", size=42), # HEAD fails -> fallback to listed size/etag
_obj("c"),
]

result = asyncio.run(handler._process_list_objects(client, "bucket", contents))

# Internal key dropped, order preserved.
assert [o["key"] for o in result] == ["a", "b", "c"]
# Successful HEAD -> plaintext size + client-etag.
assert result[0]["size"] == 111
assert result[0]["etag"] == "etag-a"
# Failed HEAD -> fallback to listed size + stripped raw etag.
assert result[1]["size"] == 42
assert result[1]["etag"] == "raw-etag"
# HEADs actually ran concurrently (would be 1 if sequential), and stayed bounded.
assert handler.max_inflight > 1
assert handler.max_inflight <= LIST_HEAD_CONCURRENCY


if __name__ == "__main__":
test_parallel_order_and_fallback()
print("ok")
Loading