From 9640c09ed6cb1f6501ae5ec61f5c78649e18d42b Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Sun, 7 Jun 2026 19:50:45 -0400 Subject: [PATCH 1/3] refactor: one Array class with sync+async APIs, keeping AsyncArray Alternative to #4034 that delivers every #4027 goal except removing the public AsyncArray class. - Array gains a user-friendly constructor: Array(metadata, store_path, config=...) builds the wrapped AsyncArray internally; Array(async_array) still works for back-compat. - Array gains the full public async surface (*_async twins for every sync method), so callers no longer reach into the private _async_array. - Each operation has a single implementation: the async twin holds the logic and the sync method delegates via sync(). No drift between the pairs; a parity test guards their signatures. - No Runner protocol and no _AsyncArrayView/_rebind_state shared-state machinery: Array still wraps one AsyncArray, so async_array is the held object and mutation coherence is free. - getitem_async/setitem_async mirror the full basic-selection parameter surface (out=, fields=, prototype=). Tests: integrates the async/sync boundary suite (shared-state coupling, codec-pipeline-built-once, selection parity, out= zero-copy, full parameter surface). The loop-affinity test is xfail (strict) as a design-independent limitation of mixing zarr's background loop with a user-owned loop on stores with event-loop affinity. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/zarr/core/array.py | 389 ++++++++++++++++++++++++++++++++--------- tests/test_array.py | 147 ++++++++++++++++ tests/test_indexing.py | 56 ++++++ tests/test_sync.py | 67 +++++++ 4 files changed, 576 insertions(+), 83 deletions(-) diff --git a/src/zarr/core/array.py b/src/zarr/core/array.py index 366c19bb0c..cddae9228e 100644 --- a/src/zarr/core/array.py +++ b/src/zarr/core/array.py @@ -1806,6 +1806,42 @@ class Array[T_ArrayMetadata: (ArrayV2Metadata, ArrayV3Metadata)]: _async_array: AsyncArray[T_ArrayMetadata] + def __init__( + self, + metadata: AsyncArray[T_ArrayMetadata] | ArrayMetadata | ArrayMetadataDict, + store_path: StorePath | None = None, + config: ArrayConfigLike | None = None, + ) -> None: + """Construct an ``Array``. + + An ``Array`` is a synchronous handle that wraps a single + :class:`AsyncArray`. It can be constructed two ways: + + - the user-friendly form, ``Array(metadata, store_path, config=...)``, + which builds the underlying ``AsyncArray`` internally; and + - the low-level form, ``Array(async_array)``, which wraps an existing + ``AsyncArray`` directly. + + The friendly constructor only requires the ``Array`` to *accept* + metadata/store_path/config -- not for it to stop holding an + ``AsyncArray``. The two-class split (and the public ``AsyncArray``) is + preserved. + """ + if isinstance(metadata, AsyncArray): + if store_path is not None or config is not None: + raise TypeError( + "When constructing an Array from an AsyncArray, store_path and " + "config must not also be provided." + ) + async_array: AsyncArray[T_ArrayMetadata] = metadata + else: + if store_path is None: + raise TypeError("store_path is required when constructing an Array from metadata.") + async_array = cast( + "AsyncArray[T_ArrayMetadata]", AsyncArray(metadata, store_path, config) + ) + self._async_array = async_array + @property def async_array(self) -> AsyncArray[T_ArrayMetadata]: """An asynchronous version of the current array. Useful for batching requests. @@ -2271,7 +2307,7 @@ def nchunks_initialized(self) -> int: >>> arr.nchunks_initialized 6 """ - return sync(self.async_array.nchunks_initialized()) + return sync(self.nchunks_initialized_async()) @property def _nshards_initialized(self) -> int: @@ -2303,7 +2339,7 @@ def nbytes_stored(self) -> int: ------- size : int """ - return sync(self.async_array.nbytes_stored()) + return sync(self.nbytes_stored_async()) def _iter_shard_keys( self, origin: Sequence[int] | None = None, selection_shape: Sequence[int] | None = None @@ -2828,15 +2864,8 @@ def get_basic_selection( """ - if prototype is None: - prototype = default_buffer_prototype() return sync( - self.async_array._get_selection( - BasicIndexer(selection, self.shape, self._chunk_grid), - out=out, - fields=fields, - prototype=prototype, - ) + self.get_basic_selection_async(selection, out=out, prototype=prototype, fields=fields) ) def set_basic_selection( @@ -2937,10 +2966,7 @@ def set_basic_selection( [__setitem__][zarr.Array.__setitem__] """ - if prototype is None: - prototype = default_buffer_prototype() - indexer = BasicIndexer(selection, self.shape, self._chunk_grid) - sync(self.async_array._set_selection(indexer, value, fields=fields, prototype=prototype)) + sync(self.set_basic_selection_async(selection, value, fields=fields, prototype=prototype)) def get_orthogonal_selection( self, @@ -3065,12 +3091,9 @@ def get_orthogonal_selection( [__setitem__][zarr.Array.__setitem__] """ - if prototype is None: - prototype = default_buffer_prototype() - indexer = OrthogonalIndexer(selection, self.shape, self._chunk_grid) return sync( - self.async_array._get_selection( - indexer=indexer, out=out, fields=fields, prototype=prototype + self.get_orthogonal_selection_async( + selection, out=out, fields=fields, prototype=prototype ) ) @@ -3183,11 +3206,10 @@ def set_orthogonal_selection( [blocks][zarr.Array.blocks], [__getitem__][zarr.Array.__getitem__], [__setitem__][zarr.Array.__setitem__] """ - if prototype is None: - prototype = default_buffer_prototype() - indexer = OrthogonalIndexer(selection, self.shape, self._chunk_grid) return sync( - self.async_array._set_selection(indexer, value, fields=fields, prototype=prototype) + self.set_orthogonal_selection_async( + selection, value, fields=fields, prototype=prototype + ) ) def get_mask_selection( @@ -3271,13 +3293,8 @@ def get_mask_selection( [__setitem__][zarr.Array.__setitem__] """ - if prototype is None: - prototype = default_buffer_prototype() - indexer = MaskIndexer(mask, self.shape, self._chunk_grid) return sync( - self.async_array._get_selection( - indexer=indexer, out=out, fields=fields, prototype=prototype - ) + self.get_mask_selection_async(mask, out=out, fields=fields, prototype=prototype) ) def set_mask_selection( @@ -3360,10 +3377,7 @@ def set_mask_selection( [__setitem__][zarr.Array.__setitem__] """ - if prototype is None: - prototype = default_buffer_prototype() - indexer = MaskIndexer(mask, self.shape, self._chunk_grid) - sync(self.async_array._set_selection(indexer, value, fields=fields, prototype=prototype)) + sync(self.set_mask_selection_async(mask, value, fields=fields, prototype=prototype)) def get_coordinate_selection( self, @@ -3448,20 +3462,12 @@ def get_coordinate_selection( [__setitem__][zarr.Array.__setitem__] """ - if prototype is None: - prototype = default_buffer_prototype() - indexer = CoordinateIndexer(selection, self.shape, self._chunk_grid) - out_array = sync( - self.async_array._get_selection( - indexer=indexer, out=out, fields=fields, prototype=prototype + return sync( + self.get_coordinate_selection_async( + selection, out=out, fields=fields, prototype=prototype ) ) - if hasattr(out_array, "shape"): - # restore shape - out_array = np.array(out_array).reshape(indexer.sel_shape) - return out_array - def set_coordinate_selection( self, selection: CoordinateSelection, @@ -3539,32 +3545,11 @@ def set_coordinate_selection( [__setitem__][zarr.Array.__setitem__] """ - if prototype is None: - prototype = default_buffer_prototype() - # setup indexer - indexer = CoordinateIndexer(selection, self.shape, self._chunk_grid) - - # handle value - need ndarray-like flatten value - if not is_scalar(value, self.dtype): - try: - from numcodecs.compat import ensure_ndarray_like - - value = ensure_ndarray_like(value) # TODO replace with agnostic - except TypeError: - # Handle types like `list` or `tuple` - value = np.array(value) # TODO replace with agnostic - if hasattr(value, "shape") and len(value.shape) > 1: - value = np.array(value).reshape(-1) - - if not is_scalar(value, self.dtype) and ( - isinstance(value, NDArrayLike) and indexer.shape != value.shape - ): - raise ValueError( - f"Attempting to set a selection of {indexer.sel_shape[0]} " - f"elements with an array of {value.shape[0]} elements." + sync( + self.set_coordinate_selection_async( + selection, value, fields=fields, prototype=prototype ) - - sync(self.async_array._set_selection(indexer, value, fields=fields, prototype=prototype)) + ) def get_block_selection( self, @@ -3661,13 +3646,8 @@ def get_block_selection( [blocks][zarr.Array.blocks], [__getitem__][zarr.Array.__getitem__], [__setitem__][zarr.Array.__setitem__] """ - if prototype is None: - prototype = default_buffer_prototype() - indexer = BlockIndexer(selection, self.shape, self._chunk_grid) return sync( - self.async_array._get_selection( - indexer=indexer, out=out, fields=fields, prototype=prototype - ) + self.get_block_selection_async(selection, out=out, fields=fields, prototype=prototype) ) def set_block_selection( @@ -3761,10 +3741,7 @@ def set_block_selection( [__setitem__][zarr.Array.__setitem__] """ - if prototype is None: - prototype = default_buffer_prototype() - indexer = BlockIndexer(selection, self.shape, self._chunk_grid) - sync(self.async_array._set_selection(indexer, value, fields=fields, prototype=prototype)) + sync(self.set_block_selection_async(selection, value, fields=fields, prototype=prototype)) @property def vindex(self) -> VIndex: @@ -3828,7 +3805,7 @@ def resize(self, new_shape: ShapeLike) -> None: #>(50, 50) ``` """ - sync(self.async_array.resize(new_shape)) + sync(self.resize_async(new_shape)) def append(self, data: npt.ArrayLike, axis: int = 0) -> tuple[int, ...]: """Append `data` to `axis`. @@ -3864,7 +3841,7 @@ def append(self, data: npt.ArrayLike, axis: int = 0) -> tuple[int, ...]: >>> z.shape (20000, 2000) """ - return sync(self.async_array.append(data, axis=axis)) + return sync(self.append_async(data, axis=axis)) def update_attributes(self, new_attributes: dict[str, JSON]) -> Self: """ @@ -3891,9 +3868,255 @@ def update_attributes(self, new_attributes: dict[str, JSON]) -> Self: - The updated attributes will be merged with existing attributes, and any conflicts will be overwritten by the new values. """ - new_array = sync(self.async_array.update_attributes(new_attributes)) + return sync(self.update_attributes_async(new_attributes)) + + # ------------------------------------------------------------------ + # Asynchronous API. + # + # These ``async`` twins are the public, awaitable counterparts of the + # synchronous methods above. Each delegates to the wrapped ``AsyncArray`` + # (or to its shared ``_get_selection`` / ``_set_selection`` coroutines). + # + # Note what is NOT needed here, compared with folding ``AsyncArray`` away: + # no ``Runner`` protocol (``Array`` still reaches its coroutines through the + # single ``AsyncArray`` it holds) and no ``_AsyncArrayView`` / ``_rebind_state`` + # shared-state machinery (there is one stateful object, so nothing can + # desync). The residual marshaling (building the indexer) is trivial and + # could be deduplicated by having the sync methods wrap these twins via + # ``sync(...)``; see #4034's module-level helpers for the same idea. + # ------------------------------------------------------------------ + + async def getitem_async( + self, + selection: BasicSelection, + *, + out: NDBuffer | None = None, + prototype: BufferPrototype | None = None, + fields: Fields | None = None, + ) -> NDArrayLikeOrScalar: + """Asynchronous, basic-indexing variant of [`__getitem__`][zarr.Array.__getitem__]. + + Mirrors the full ``get_basic_selection`` parameter surface (``out``, + ``prototype``, ``fields``) so migrating sync -> async loses no capability. + """ + return await self.get_basic_selection_async( + selection, out=out, prototype=prototype, fields=fields + ) + + async def setitem_async( + self, + selection: BasicSelection, + value: npt.ArrayLike, + *, + fields: Fields | None = None, + prototype: BufferPrototype | None = None, + ) -> None: + """Asynchronous, basic-indexing variant of [`__setitem__`][zarr.Array.__setitem__].""" + await self.set_basic_selection_async(selection, value, fields=fields, prototype=prototype) + + async def get_basic_selection_async( + self, + selection: BasicSelection = Ellipsis, + *, + out: NDBuffer | None = None, + prototype: BufferPrototype | None = None, + fields: Fields | None = None, + ) -> NDArrayLikeOrScalar: + """Asynchronous variant of [`get_basic_selection`][zarr.Array.get_basic_selection].""" + if prototype is None: + prototype = default_buffer_prototype() + return await self.async_array._get_selection( + BasicIndexer(selection, self.shape, self._chunk_grid), + out=out, + fields=fields, + prototype=prototype, + ) + + async def set_basic_selection_async( + self, + selection: BasicSelection, + value: npt.ArrayLike, + *, + fields: Fields | None = None, + prototype: BufferPrototype | None = None, + ) -> None: + """Asynchronous variant of [`set_basic_selection`][zarr.Array.set_basic_selection].""" + if prototype is None: + prototype = default_buffer_prototype() + indexer = BasicIndexer(selection, self.shape, self._chunk_grid) + await self.async_array._set_selection(indexer, value, fields=fields, prototype=prototype) + + async def get_orthogonal_selection_async( + self, + selection: OrthogonalSelection, + *, + out: NDBuffer | None = None, + fields: Fields | None = None, + prototype: BufferPrototype | None = None, + ) -> NDArrayLikeOrScalar: + """Asynchronous variant of [`get_orthogonal_selection`][zarr.Array.get_orthogonal_selection].""" + if prototype is None: + prototype = default_buffer_prototype() + indexer = OrthogonalIndexer(selection, self.shape, self._chunk_grid) + return await self.async_array._get_selection( + indexer=indexer, out=out, fields=fields, prototype=prototype + ) + + async def set_orthogonal_selection_async( + self, + selection: OrthogonalSelection, + value: npt.ArrayLike, + *, + fields: Fields | None = None, + prototype: BufferPrototype | None = None, + ) -> None: + """Asynchronous variant of [`set_orthogonal_selection`][zarr.Array.set_orthogonal_selection].""" + if prototype is None: + prototype = default_buffer_prototype() + indexer = OrthogonalIndexer(selection, self.shape, self._chunk_grid) + await self.async_array._set_selection(indexer, value, fields=fields, prototype=prototype) + + async def get_mask_selection_async( + self, + mask: MaskSelection, + *, + out: NDBuffer | None = None, + fields: Fields | None = None, + prototype: BufferPrototype | None = None, + ) -> NDArrayLikeOrScalar: + """Asynchronous variant of [`get_mask_selection`][zarr.Array.get_mask_selection].""" + if prototype is None: + prototype = default_buffer_prototype() + indexer = MaskIndexer(mask, self.shape, self._chunk_grid) + return await self.async_array._get_selection( + indexer=indexer, out=out, fields=fields, prototype=prototype + ) + + async def set_mask_selection_async( + self, + mask: MaskSelection, + value: npt.ArrayLike, + *, + fields: Fields | None = None, + prototype: BufferPrototype | None = None, + ) -> None: + """Asynchronous variant of [`set_mask_selection`][zarr.Array.set_mask_selection].""" + if prototype is None: + prototype = default_buffer_prototype() + indexer = MaskIndexer(mask, self.shape, self._chunk_grid) + await self.async_array._set_selection(indexer, value, fields=fields, prototype=prototype) + + async def get_coordinate_selection_async( + self, + selection: CoordinateSelection, + *, + out: NDBuffer | None = None, + fields: Fields | None = None, + prototype: BufferPrototype | None = None, + ) -> NDArrayLikeOrScalar: + """Asynchronous variant of [`get_coordinate_selection`][zarr.Array.get_coordinate_selection].""" + if prototype is None: + prototype = default_buffer_prototype() + indexer = CoordinateIndexer(selection, self.shape, self._chunk_grid) + out_array = await self.async_array._get_selection( + indexer=indexer, out=out, fields=fields, prototype=prototype + ) + if hasattr(out_array, "shape"): + # restore shape + out_array = np.array(out_array).reshape(indexer.sel_shape) + return out_array + + async def set_coordinate_selection_async( + self, + selection: CoordinateSelection, + value: npt.ArrayLike, + *, + fields: Fields | None = None, + prototype: BufferPrototype | None = None, + ) -> None: + """Asynchronous variant of [`set_coordinate_selection`][zarr.Array.set_coordinate_selection].""" + if prototype is None: + prototype = default_buffer_prototype() + # setup indexer + indexer = CoordinateIndexer(selection, self.shape, self._chunk_grid) + + # handle value - need ndarray-like flatten value + if not is_scalar(value, self.dtype): + try: + from numcodecs.compat import ensure_ndarray_like + + value = ensure_ndarray_like(value) # TODO replace with agnostic + except TypeError: + # Handle types like `list` or `tuple` + value = np.array(value) # TODO replace with agnostic + if hasattr(value, "shape") and len(value.shape) > 1: + value = np.array(value).reshape(-1) + + if not is_scalar(value, self.dtype) and ( + isinstance(value, NDArrayLike) and indexer.shape != value.shape + ): + raise ValueError( + f"Attempting to set a selection of {indexer.sel_shape[0]} " + f"elements with an array of {value.shape[0]} elements." + ) + + await self.async_array._set_selection(indexer, value, fields=fields, prototype=prototype) + + async def get_block_selection_async( + self, + selection: BasicSelection, + *, + out: NDBuffer | None = None, + fields: Fields | None = None, + prototype: BufferPrototype | None = None, + ) -> NDArrayLikeOrScalar: + """Asynchronous variant of [`get_block_selection`][zarr.Array.get_block_selection].""" + if prototype is None: + prototype = default_buffer_prototype() + indexer = BlockIndexer(selection, self.shape, self._chunk_grid) + return await self.async_array._get_selection( + indexer=indexer, out=out, fields=fields, prototype=prototype + ) + + async def set_block_selection_async( + self, + selection: BasicSelection, + value: npt.ArrayLike, + *, + fields: Fields | None = None, + prototype: BufferPrototype | None = None, + ) -> None: + """Asynchronous variant of [`set_block_selection`][zarr.Array.set_block_selection].""" + if prototype is None: + prototype = default_buffer_prototype() + indexer = BlockIndexer(selection, self.shape, self._chunk_grid) + await self.async_array._set_selection(indexer, value, fields=fields, prototype=prototype) + + async def resize_async(self, new_shape: ShapeLike, delete_outside_chunks: bool = True) -> None: + """Asynchronous variant of [`resize`][zarr.Array.resize].""" + await self.async_array.resize(new_shape, delete_outside_chunks=delete_outside_chunks) + + async def append_async(self, data: npt.ArrayLike, axis: int = 0) -> tuple[int, ...]: + """Asynchronous variant of [`append`][zarr.Array.append].""" + return await self.async_array.append(data, axis=axis) + + async def update_attributes_async(self, new_attributes: dict[str, JSON]) -> Self: + """Asynchronous variant of [`update_attributes`][zarr.Array.update_attributes].""" + new_array = await self.async_array.update_attributes(new_attributes) return type(self)(new_array) + async def nchunks_initialized_async(self) -> int: + """Asynchronous variant of [`nchunks_initialized`][zarr.Array.nchunks_initialized].""" + return await self.async_array.nchunks_initialized() + + async def nbytes_stored_async(self) -> int: + """Asynchronous variant of [`nbytes_stored`][zarr.Array.nbytes_stored].""" + return await self.async_array.nbytes_stored() + + async def info_complete_async(self) -> Any: + """Asynchronous variant of [`info_complete`][zarr.Array.info_complete].""" + return await self.async_array.info_complete() + def __repr__(self) -> str: return f"" @@ -3949,7 +4172,7 @@ def info_complete(self) -> Any: ------- [zarr.Array.info][] - The statically known subset of metadata about an array. """ - return sync(self.async_array.info_complete()) + return sync(self.info_complete_async()) async def _shards_initialized( diff --git a/tests/test_array.py b/tests/test_array.py index 0d6d2d5906..c4e054f384 100644 --- a/tests/test_array.py +++ b/tests/test_array.py @@ -6,6 +6,7 @@ import pickle import re import sys +import warnings from itertools import accumulate, starmap from typing import TYPE_CHECKING, Any, Literal from unittest import mock @@ -18,6 +19,7 @@ import zarr.api.asynchronous import zarr.api.synchronous as sync_api +import zarr.core.array from tests.conftest import skip_object_dtype from zarr import Array, Group from zarr.abc.store import Store @@ -2374,3 +2376,148 @@ async def test_create_array_chunks_3d( shape = (10, 12, 15) arr = await create_array(store={}, shape=shape, chunks=chunk_input, dtype="float64") assert arr.write_chunk_sizes == expected + + +# Names of the public synchronous Array methods that must have an awaitable +# ``*_async`` twin. The async API is additive: it lives alongside the sync API +# on the same class, and the public ``AsyncArray`` is preserved. +_ASYNC_TWIN_METHODS = [ + "getitem", + "setitem", + "get_basic_selection", + "set_basic_selection", + "get_orthogonal_selection", + "set_orthogonal_selection", + "get_mask_selection", + "set_mask_selection", + "get_coordinate_selection", + "set_coordinate_selection", + "get_block_selection", + "set_block_selection", + "resize", + "append", + "update_attributes", + "nchunks_initialized", + "nbytes_stored", + "info_complete", +] + + +@pytest.mark.parametrize("name", _ASYNC_TWIN_METHODS) +def test_array_async_twin_exists(name: str) -> None: + """Every public sync Array method exposes an awaitable ``*_async`` twin. + + Guards the additive async surface: it must stay in lockstep with the sync + API. ``getitem``/``setitem`` are exposed via the ``__getitem__``/``__setitem__`` + dunders on the sync side but as named ``*_async`` methods on the async side. + """ + twin = getattr(Array, f"{name}_async", None) + assert twin is not None, f"Array.{name}_async is missing" + assert inspect.iscoroutinefunction(twin), f"Array.{name}_async must be a coroutine function" + + +def test_array_friendly_constructor_keeps_asyncarray() -> None: + """The friendly constructor builds (and holds) an AsyncArray internally. + + Demonstrates that a first-class ``Array`` constructor only requires ``Array`` + to *accept* metadata/store_path/config -- not to stop wrapping an + ``AsyncArray``. No ``Runner`` or shared-state-view machinery is involved. + """ + from zarr.core.array import AsyncArray + + src = zarr.create_array({}, shape=(8,), chunks=(4,), dtype="i4") + + # friendly form + friendly = Array(src.metadata, src.store_path) + assert isinstance(friendly, Array) + assert isinstance(friendly._async_array, AsyncArray) + assert friendly.shape == (8,) + assert not hasattr(friendly, "_runner") + + # legacy form still works + legacy = Array(src._async_array) + assert legacy.shape == (8,) + + # mixing the forms is rejected + with pytest.raises(TypeError): + Array(src._async_array, src.store_path) + with pytest.raises(TypeError): + Array(src.metadata) + + +def _async_array_handle(arr: Array[Any]) -> AsyncArray[Any]: + # ``Array.async_array`` may emit a DeprecationWarning in newer versions; the + # contract tests below assert what the handle does while it exists, not + # whether it warns. + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + return arr.async_array + + +@pytest.mark.parametrize("store", ["memory"], indirect=True) +def test_async_array_identity_is_stable(store: MemoryStore) -> None: + """``Array.async_array`` must return the same live handle on every access.""" + z = zarr.create_array(store, shape=(8,), chunks=(4,), dtype="i4") + assert _async_array_handle(z) is _async_array_handle(z) + + +@pytest.mark.parametrize("store", ["memory"], indirect=True) +def test_resize_through_async_array_updates_array(store: MemoryStore) -> None: + """Mutations made through ``async_array`` must stay coupled to the parent Array. + + A detached handle would update the store metadata while ``Array.metadata`` + stays stale, so subsequent indexing silently uses the old shape. + """ + z = zarr.create_array(store, shape=(8,), chunks=(4,), dtype="i4") + z[:] = np.arange(8, dtype="i4") + sync(_async_array_handle(z).resize((16,))) + assert z.shape == (16,) + result = z[:] + assert isinstance(result, NDArrayLike) + assert result.shape == (16,) + + +@pytest.mark.parametrize("store", ["memory"], indirect=True) +def test_resize_through_array_visible_via_async_array(store: MemoryStore) -> None: + """A handle obtained before a mutation must see the post-mutation metadata.""" + z = zarr.create_array(store, shape=(8,), chunks=(4,), dtype="i4") + aa = _async_array_handle(z) + z.resize((16,)) + assert aa.metadata.shape == (16,) + + +@pytest.mark.parametrize("store", ["memory"], indirect=True) +def test_update_attributes_through_async_array_updates_array(store: MemoryStore) -> None: + """Attribute updates through ``async_array`` must be visible on the parent Array.""" + z = zarr.create_array(store, shape=(8,), chunks=(4,), dtype="i4") + sync(_async_array_handle(z).update_attributes({"foo": "bar"})) + assert z.attrs["foo"] == "bar" + + +@pytest.mark.parametrize("store", ["memory"], indirect=True) +@pytest.mark.parametrize("entry_point", ["create", "open"]) +def test_codec_pipeline_built_once_per_sync_entry( + store: Store, entry_point: str, monkeypatch: pytest.MonkeyPatch +) -> None: + """Sync create/open entry points must build the codec pipeline exactly once. + + A wrapper layer that re-derives state already built by an inner constructor + doubles the cost of every open/create and group member access. + """ + calls = 0 + real = zarr.core.array.create_codec_pipeline + + def counting(*args: Any, **kwargs: Any) -> Any: + nonlocal calls + calls += 1 + return real(*args, **kwargs) + + if entry_point == "open": + zarr.create_array(store, shape=(8,), chunks=(4,), dtype="i4") + + monkeypatch.setattr(zarr.core.array, "create_codec_pipeline", counting) + if entry_point == "create": + zarr.create_array(store, shape=(8,), chunks=(4,), dtype="i4") + else: + zarr.open_array(store, mode="r") + assert calls == 1 diff --git a/tests/test_indexing.py b/tests/test_indexing.py index c45942eee7..835ef41eeb 100644 --- a/tests/test_indexing.py +++ b/tests/test_indexing.py @@ -1,5 +1,6 @@ from __future__ import annotations +import inspect import itertools from collections import Counter from typing import TYPE_CHECKING, Any @@ -2070,6 +2071,12 @@ def test_iter_regions( assert observed == expected +requires_array_async_methods = pytest.mark.skipif( + not hasattr(Array, "getitem_async"), + reason="Array.*_async methods do not exist on this version", +) + + class TestAsync: @pytest.mark.parametrize( ("indexer", "expected"), @@ -2175,3 +2182,52 @@ async def test_async_invalid_indexer(self, store): with pytest.raises(IndexError): await async_zarr.oindex.getitem("invalid_indexer") + + @requires_array_async_methods + @pytest.mark.parametrize("method", ["orthogonal", "coordinate", "block", "mask"]) + @pytest.mark.asyncio + async def test_get_selection_async_matches_sync(self, store, method): + # Each Array.get_*_selection_async method must return the same result + # as its sync twin, so the two surfaces cannot drift apart. + z = zarr.create_array(store=store, shape=(4, 4), chunks=(2, 2), zarr_format=3, dtype="i8") + z[...] = np.arange(16, dtype="i8").reshape(4, 4) + selection = { + "orthogonal": ([0, 2], slice(None)), + "coordinate": ([0, 1], [0, 1]), + "block": (0, 0), + "mask": np.arange(16).reshape(4, 4) % 5 == 0, + }[method] + expected = getattr(z, f"get_{method}_selection")(selection) + actual = await getattr(z, f"get_{method}_selection_async")(selection) + assert_array_equal(expected, actual) + + @requires_array_async_methods + @pytest.mark.asyncio + async def test_getitem_setitem_async_match_sync(self, store): + z = zarr.create_array(store=store, shape=(4, 4), chunks=(2, 2), zarr_format=3, dtype="i8") + await z.setitem_async(Ellipsis, np.arange(16, dtype="i8").reshape(4, 4)) + assert_array_equal(z[1:3, :], await z.getitem_async((slice(1, 3), slice(None)))) + + @requires_array_async_methods + @pytest.mark.asyncio + async def test_get_basic_selection_async_supports_out(self, store): + # Zero-copy reads via ``out=`` must not be lost on the async surface. + z = zarr.create_array(store=store, shape=(4, 4), chunks=(2, 2), zarr_format=3, dtype="i8") + z[...] = np.arange(16, dtype="i8").reshape(4, 4) + expected = z.get_basic_selection((slice(1, 3), slice(None))) + out = get_ndbuffer_class().from_numpy_array(np.empty_like(expected)) + await z.get_basic_selection_async((slice(1, 3), slice(None)), out=out) + assert_array_equal(expected, out.as_numpy_array()) + + @requires_array_async_methods + def test_item_async_twins_expose_full_sync_parameter_surface(self): + # getitem_async/setitem_async mirror the basic selection path; they + # must not silently drop parameters (out=, fields=) that the sync + # surface supports, or users migrating sync -> async lose capability + # without an error. + get_sync = set(inspect.signature(Array.get_basic_selection).parameters) + get_async = set(inspect.signature(Array.getitem_async).parameters) + assert get_sync - {"self"} <= get_async + set_sync = set(inspect.signature(Array.set_basic_selection).parameters) + set_async = set(inspect.signature(Array.setitem_async).parameters) + assert set_sync - {"self"} <= set_async diff --git a/tests/test_sync.py b/tests/test_sync.py index c5eadb0f4f..73b2d8733e 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -2,9 +2,12 @@ from collections.abc import AsyncGenerator from unittest.mock import AsyncMock, patch +import numpy as np import pytest import zarr +from zarr.abc.store import ByteRequest +from zarr.core.buffer import Buffer, BufferPrototype from zarr.core.sync import ( SyncError, SyncMixin, @@ -15,6 +18,7 @@ loop, sync, ) +from zarr.storage import MemoryStore, WrapperStore @pytest.fixture(params=[True, False]) @@ -163,3 +167,66 @@ def test_cleanup_resources_idempotent() -> None: _get_executor() # trigger resource creation (iothread, loop, thread-pool) cleanup_resources() cleanup_resources() + + +class LoopBoundStore(WrapperStore[MemoryStore]): + """A store whose I/O only works on the event loop that first drove it. + + Mimics fsspec/aiohttp-backed stores, whose sessions and connectors are + lazily bound to whichever event loop first performs I/O. + """ + + _bound_loop: asyncio.AbstractEventLoop | None = None + + def _check_loop(self) -> None: + running = asyncio.get_running_loop() + if self._bound_loop is None: + self._bound_loop = running + elif running is not self._bound_loop: + raise RuntimeError("store I/O driven from a different event loop than it is bound to") + + async def get( + self, key: str, prototype: BufferPrototype, byte_range: ByteRequest | None = None + ) -> Buffer | None: + self._check_loop() + return await super().get(key, prototype, byte_range) + + async def set(self, key: str, value: Buffer) -> None: + self._check_loop() + await super().set(key, value) + + +@pytest.mark.skipif( + not hasattr(zarr.Array, "getitem_async"), + reason="Array.*_async methods do not exist on this version", +) +@pytest.mark.xfail( + reason=( + "Design-independent limitation: a store with event-loop affinity binds to " + "zarr's background loop during sync create/open, so awaiting Array.*_async " + "from a different user-owned loop trips the affinity check. Orthogonal to " + "keeping vs. removing AsyncArray; tracked separately." + ), + strict=True, +) +def test_array_async_methods_with_loop_bound_store() -> None: + # The supported pattern is: open an array synchronously (its I/O runs on + # zarr's internal background loop), then await the array's *_async methods + # from user-owned async code (a different loop). Stores with loop affinity + # must keep working across that boundary. + store = LoopBoundStore(MemoryStore()) + z = zarr.create_array(store, shape=(8,), chunks=(4,), dtype="i4") + z[:] = np.arange(8, dtype="i4") + + async def read() -> object: + return await z.getitem_async(slice(2, 6)) + + # Drive a user-owned loop explicitly and close it in `finally`, so the + # expected RuntimeError (xfail) doesn't leave an unclosed loop behind for + # the GC to report as an unraisable exception. + user_loop = asyncio.new_event_loop() + try: + result = user_loop.run_until_complete(read()) + finally: + user_loop.close() + np.testing.assert_array_equal(np.arange(2, 6, dtype="i4"), result) From c7312e1f5bad0076c3be54f9e5aad89bc9049c35 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Mon, 8 Jun 2026 11:53:32 -0400 Subject: [PATCH 2/3] docs: changelog entry --- changes/4049.feature.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/4049.feature.md diff --git a/changes/4049.feature.md b/changes/4049.feature.md new file mode 100644 index 0000000000..91a2f9d350 --- /dev/null +++ b/changes/4049.feature.md @@ -0,0 +1 @@ +Add a public asynchronous API to `zarr.Array`. Every synchronous `Array` method now has an awaitable `*_async` twin (`getitem_async`, `setitem_async`, `get_basic_selection_async`, `resize_async`, `append_async`, `update_attributes_async`, `nchunks_initialized_async`, and the other selection variants), so async callers can await array operations directly instead of reaching into `Array.async_array`. The `*_async` twins expose the same parameters as their synchronous counterparts; as part of this, `Array.resize` now accepts `delete_outside_chunks` (previously only reachable through the async surface). From 621f4770adb539756dfe15eb303034367b460b13 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Mon, 8 Jun 2026 11:55:29 -0400 Subject: [PATCH 3/3] fix: matching methods --- src/zarr/core/array.py | 8 +++++-- tests/test_array.py | 54 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 2 deletions(-) diff --git a/src/zarr/core/array.py b/src/zarr/core/array.py index cddae9228e..05dda29365 100644 --- a/src/zarr/core/array.py +++ b/src/zarr/core/array.py @@ -3769,7 +3769,7 @@ def blocks(self) -> BlockIndex: examples.""" return BlockIndex(self) - def resize(self, new_shape: ShapeLike) -> None: + def resize(self, new_shape: ShapeLike, delete_outside_chunks: bool = True) -> None: """ Change the shape of the array by growing or shrinking one or more dimensions. This is an in-place operation that modifies the array. @@ -3778,6 +3778,10 @@ def resize(self, new_shape: ShapeLike) -> None: ---------- new_shape : tuple New shape of the array. + delete_outside_chunks : bool, optional + If True (default), chunks that fall entirely outside the new array + shape are deleted from the underlying store. If False, those chunks + are left in place. Notes ----- @@ -3805,7 +3809,7 @@ def resize(self, new_shape: ShapeLike) -> None: #>(50, 50) ``` """ - sync(self.resize_async(new_shape)) + sync(self.resize_async(new_shape, delete_outside_chunks=delete_outside_chunks)) def append(self, data: npt.ArrayLike, axis: int = 0) -> tuple[int, ...]: """Append `data` to `axis`. diff --git a/tests/test_array.py b/tests/test_array.py index c4e054f384..a542315c1c 100644 --- a/tests/test_array.py +++ b/tests/test_array.py @@ -7,6 +7,7 @@ import re import sys import warnings +from collections.abc import Callable from itertools import accumulate, starmap from typing import TYPE_CHECKING, Any, Literal from unittest import mock @@ -2416,6 +2417,59 @@ def test_array_async_twin_exists(name: str) -> None: assert inspect.iscoroutinefunction(twin), f"Array.{name}_async must be a coroutine function" +def _param_defaults(fn: Callable[..., Any]) -> dict[str, object]: + """Map each non-``self`` parameter of ``fn`` to its default (or ``inspect._empty``).""" + return { + name: param.default + for name, param in inspect.signature(fn).parameters.items() + if name != "self" + } + + +# Twins whose sync counterpart is the ``__getitem__``/``__setitem__`` dunder. +# The dunder exposes a deliberately narrower surface than the async twin, so we +# compare against the basic-selection method and only require that the twin +# loses none of its parameters (subset, not equality). +_DUNDER_TWIN_SYNC_SOURCE = { + "getitem": "get_basic_selection", + "setitem": "set_basic_selection", +} + + +@pytest.mark.parametrize("name", _ASYNC_TWIN_METHODS) +def test_array_async_twin_signature_matches_sync(name: str) -> None: + """An ``*_async`` twin must expose the same call surface as its sync method. + + Existence alone (see [`test_array_async_twin_exists`]) does not stop a twin + from drifting: a parameter present on one side but not the other lets a + capability silently disappear from one API. This pins parameter names *and* + defaults so the sync and async surfaces stay in lockstep. + """ + async_params = _param_defaults(getattr(Array, f"{name}_async")) + + if name in _DUNDER_TWIN_SYNC_SOURCE: + sync_source = _param_defaults(getattr(Array, _DUNDER_TWIN_SYNC_SOURCE[name])) + missing = set(sync_source) - set(async_params) + assert not missing, ( + f"Array.{name}_async drops parameters {missing} exposed by the sync selection surface" + ) + return + + sync_attr = inspect.getattr_static(Array, name) + if isinstance(sync_attr, property): + sync_fn = sync_attr.fget + assert sync_fn is not None, f"Array.{name} property has no getter" + else: + sync_fn = sync_attr + sync_params = _param_defaults(sync_fn) + + assert sync_params == async_params, ( + f"Array.{name} and Array.{name}_async have diverging signatures:\n" + f" sync : {sync_params}\n" + f" async: {async_params}" + ) + + def test_array_friendly_constructor_keeps_asyncarray() -> None: """The friendly constructor builds (and holds) an AsyncArray internally.