Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generalize handling of chunked array types #7019

Merged
merged 189 commits into from
May 18, 2023
Merged
Show file tree
Hide file tree
Changes from 175 commits
Commits
Show all changes
189 commits
Select commit Hold shift + click to select a range
15fc2b8
generalise chunk methods to allow cubed
TomNicholas Sep 10, 2022
5e05b71
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 10, 2022
cff89ee
fic typing typo
TomNicholas Sep 10, 2022
039973b
Merge branch 'cubed_integration' of https:/TomNicholas/xa…
TomNicholas Sep 10, 2022
60d44bc
fixed circular import
TomNicholas Sep 11, 2022
5ddba7e
fix some mypy errors
TomNicholas Sep 11, 2022
37d0d66
added cubed to mypy ignore list
TomNicholas Sep 11, 2022
67d7efc
Merge branch 'main' into cubed_integration
TomNicholas Sep 11, 2022
cdcb3fb
simplify __array_ufunc__ check
TomNicholas Sep 12, 2022
73e4563
Revert "simplify __array_ufunc__ check" as I pushed to wrong branch
TomNicholas Sep 12, 2022
5995685
update cubed array type
TomNicholas Sep 20, 2022
46223ae
Merge branch 'main' into cubed_integration
TomNicholas Sep 20, 2022
320b09f
fix missed conflict
TomNicholas Sep 20, 2022
3facfd6
sketch for ChunkManager adapter class
TomNicholas Sep 20, 2022
c616a85
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 20, 2022
ecabaa4
Remove erroneous docstring about usage of map_blocks
TomNicholas Sep 23, 2022
e53a588
apply_ufunc -> apply_gufunc
TomNicholas Sep 23, 2022
fe21edd
chunk -> from_array
TomNicholas Sep 23, 2022
3f6aedc
remove staticmethods
TomNicholas Sep 23, 2022
ea8f482
attempt to type methods of ABC
TomNicholas Sep 23, 2022
c49ab8e
from_array
TomNicholas Sep 23, 2022
26d1868
attempt to specify types
TomNicholas Sep 23, 2022
e9b4a33
method for checking array type
TomNicholas Sep 26, 2022
3a43b00
Merge branch 'main' into pr/7019
Illviljan Oct 27, 2022
c7c9589
Update pyproject.toml
Illviljan Oct 27, 2022
fc051e3
Merge branch 'main' into cubed_integration
TomNicholas Jan 21, 2023
56e9d0f
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 21, 2023
3b16cca
fixed import errors
TomNicholas Jan 22, 2023
7ac3323
generalize .chunk method kwargs
TomNicholas Jan 22, 2023
e732b87
used dask functions in dask chunkmanager
TomNicholas Jan 22, 2023
68930eb
Merge branch 'cubed_integration' of https:/TomNicholas/xa…
TomNicholas Jan 22, 2023
8442e1f
define signatures for apply_gufunc, blockwise, map_blocks
TomNicholas Jan 22, 2023
3717431
prototype function to detect which parallel backend to use
TomNicholas Jan 22, 2023
78d8969
Merge branch 'main' into cubed_integration
TomNicholas Mar 6, 2023
7ac6531
add cubed.apply_gufunc
TomNicholas Mar 6, 2023
e423bfb
ruffify
TomNicholas Mar 6, 2023
149db9d
add rechunk and compute methods for cubed
TomNicholas Mar 6, 2023
280c563
xr.apply_ufunc now dispatches to chunkmanager.apply_gufunc
TomNicholas Mar 6, 2023
42186e7
CubedManager.chunks
TomNicholas Mar 6, 2023
103a755
attempt to keep dask and cubed imports lazy
TomNicholas Mar 6, 2023
f2bce3d
generalize idxmax
TomNicholas Mar 6, 2023
f09947d
move unify_chunks import to ChunkManager
TomNicholas Mar 6, 2023
e760f10
generalize Dataset.load()
TomNicholas Mar 6, 2023
b1a4e35
check explicitly for chunks attribute instead of hard-coding cubed
TomNicholas Mar 6, 2023
5320f4d
better function names
TomNicholas Mar 6, 2023
45ed5d2
add cubed version of unify_chunks
TomNicholas Mar 6, 2023
eec096b
recognize wrapped duck dask arrays (e.g. pint wrapping dask)
TomNicholas Mar 6, 2023
c64ff5f
add some tests for fetching ChunkManagers
TomNicholas Mar 7, 2023
8a37905
add from_array_kwargs to open_dataset
TomNicholas Mar 9, 2023
989d6bb
add from_array_kwargs to open_zarr
TomNicholas Mar 9, 2023
8c7fe79
pipe constructors through chunkmanager
TomNicholas Mar 9, 2023
0222b55
generalize map_blocks inside coding
TomNicholas Mar 9, 2023
9d6cf6b
Merge branch 'main' into cubed_integration
TomNicholas Mar 13, 2023
afc6abc
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 13, 2023
2c0cc26
fixed full_like
TomNicholas Mar 13, 2023
1a255cf
Merge branch 'cubed_integration' of https:/TomNicholas/xa…
TomNicholas Mar 13, 2023
c398d98
add from_array_kwargs to open_zarr
TomNicholas Mar 13, 2023
598bf12
don't import dask.tokenize
TomNicholas Mar 14, 2023
7bef188
fix bugs with passing from_array_kwargs down
TomNicholas Mar 14, 2023
7af5395
generalise reductions by adding to chunkmanager
TomNicholas Mar 14, 2023
287e96c
moved nanfirst/nanlast to duck_array_ops from dask_array_ops
TomNicholas Mar 14, 2023
8bbc141
generalize interp
TomNicholas Mar 14, 2023
6cfe9fa
generalized chunk_hint function inside indexing
TomNicholas Mar 14, 2023
4ca044b
DaskIndexingAdapter->ChunkedIndexingAdapter
TomNicholas Mar 14, 2023
8ed5ed6
Merge branch 'main' into cubed_integration
TomNicholas Mar 14, 2023
2a4c38b
Revert "DaskIndexingAdapter->ChunkedIndexingAdapter"
TomNicholas Mar 14, 2023
45a4c98
pass cubed-related kwargs down through to_zarr by adding .store to Ch…
TomNicholas Mar 14, 2023
dee5b33
fix typing_extensions on py3.9
TomNicholas Mar 14, 2023
176d7fa
fix ImportError with cubed array type
TomNicholas Mar 14, 2023
9e58d6d
give up trying to import TypeAlias in CI
TomNicholas Mar 14, 2023
a6219a0
fix import of T_Chunks
TomNicholas Mar 14, 2023
9f21994
fix no_implicit_optional warnings
TomNicholas Mar 14, 2023
eb7bb0b
don't define CubedManager if cubed can't be imported
TomNicholas Mar 14, 2023
57733de
fix local mypy errors
TomNicholas Mar 14, 2023
4c58b28
don't explicitly pass enforce_ndim into dask.array.map_blocks
TomNicholas Mar 15, 2023
d07830c
fix drop_axis default
TomNicholas Mar 15, 2023
c1bf040
Merge branch 'main' into cubed_integration
TomNicholas Mar 16, 2023
3ae21d9
use indexing adapter on cubed arrays too
TomNicholas Mar 16, 2023
7ef0129
use array API-compatible version of astype function
TomNicholas Mar 16, 2023
ec22963
whatsnew
TomNicholas Mar 16, 2023
4c8d773
document new kwargs
TomNicholas Mar 16, 2023
f4de577
add chunkmanager entrypoint
TomNicholas Mar 21, 2023
1cd7283
move CubedManager to a separate package
TomNicholas Mar 21, 2023
5386711
guess chunkmanager based on whats available
TomNicholas Mar 21, 2023
6b173de
Merge branch 'main' into cubed_integration
TomNicholas Mar 22, 2023
c431a5f
fix bug with tokenizing
TomNicholas Mar 22, 2023
7ab9047
adapt tests to emulate existence of entrypoint
TomNicholas Mar 22, 2023
72f8f5f
use fixture to setup/teardown dummy entrypoint
TomNicholas Mar 22, 2023
34c6aea
refactor to make DaskManager unavailable if dask not installed
TomNicholas Mar 22, 2023
fb9466d
typing
TomNicholas Mar 22, 2023
ffd2e21
Merge branch 'main' into cubed_integration
TomNicholas Mar 22, 2023
36b2be0
move whatsnew to latest xarray version
TomNicholas Mar 22, 2023
77a1e4e
remove superfluous lines from whatsnew
TomNicholas Mar 22, 2023
a6222f9
fix bug where zarr backend attempted to use dask when not installed
TomNicholas Mar 23, 2023
61fe236
Remove rogue print statement
TomNicholas Mar 23, 2023
447d1f1
Merge branch 'cubed_integration' of https:/TomNicholas/xa…
TomNicholas Mar 23, 2023
a7a6a6e
Clarify what's new
TomNicholas Mar 23, 2023
aa64996
use monkeypatch to mock registering of dummy chunkmanager
TomNicholas Mar 23, 2023
fec1a13
Merge branch 'cubed_integration' of https:/TomNicholas/xa…
TomNicholas Mar 23, 2023
db11947
more tests for guessing chunkmanager correctly
TomNicholas Mar 23, 2023
2c18df6
raise TypeError if no chunkmanager found for array types
TomNicholas Mar 23, 2023
2e49154
Correct is_chunked_array check
TomNicholas Mar 23, 2023
748e90d
vendor dask.array.core.normalize_chunks
TomNicholas Mar 24, 2023
70804f4
Merge branch 'cubed_integration' of https:/TomNicholas/xa…
TomNicholas Mar 24, 2023
dae2fe4
add default implementation of rechunk in ABC
TomNicholas Mar 24, 2023
4ef500c
remove cubed-specific type check in daskmanager
TomNicholas Mar 24, 2023
ba66419
nanfirst->chunked_nanfirst
TomNicholas Mar 24, 2023
7fd4617
revert adding cubed to NON_NUMPY_SUPPORTED_ARRAY_TYPES
TomNicholas Mar 24, 2023
69d77c9
licensing to vendor functions from dask
TomNicholas Mar 24, 2023
8337857
fix bug
TomNicholas Mar 24, 2023
9850a46
ignore mypy error
TomNicholas Mar 24, 2023
488fd5b
separate chunk_manager kwarg from from_array_kwargs dict
TomNicholas Mar 29, 2023
00bcf6c
rename kwarg to chunked_array_type
TomNicholas Mar 29, 2023
ff1f8ab
Merge branch 'main' into cubed_integration
TomNicholas Mar 30, 2023
844726d
refactor from_array_kwargs in .chunk ready for deprecation
TomNicholas Apr 4, 2023
3d56a3d
print statements in test so I can comment on them
TomNicholas Apr 5, 2023
1952c55
remove print statements now I've commented on them in PR
TomNicholas Apr 5, 2023
3ba8d42
should fix dask naming tests
TomNicholas Apr 5, 2023
b15411c
Merge branch 'main' into cubed_integration
TomNicholas Apr 5, 2023
53d6094
make dask-specific kwargs explicit in from_array
TomNicholas Apr 5, 2023
7dc6581
debugging print statements
TomNicholas Apr 6, 2023
fcaf499
Revert "debugging print statements"
TomNicholas Apr 6, 2023
64df7e8
fix gnarly bug with auto-determining chunksizes caused by not referri…
TomNicholas Apr 6, 2023
747ada5
hopefully fix broken docstring
TomNicholas Apr 6, 2023
9b33ab7
Revert "make dask-specific kwargs explicit in from_array"
TomNicholas Apr 6, 2023
c8d5aa1
Merge branch 'main' into cubed_integration
TomNicholas Apr 6, 2023
6a7a043
show chunksize limit used in failing tests
TomNicholas Apr 6, 2023
20f92c6
move lazy indexing adapter up out of chunkmanager code
TomNicholas Apr 6, 2023
796a577
try upgrading minimum version of dask
TomNicholas Apr 6, 2023
29d0c92
Revert "try upgrading minimum version of dask"
TomNicholas Apr 11, 2023
031017b
un-vendor dask.array.core.normalize_chunks
TomNicholas Apr 11, 2023
a8c3413
Merge branch 'main' into cubed_integration
TomNicholas Apr 11, 2023
14a1226
refactored to all passing ChunkManagerEntrypoint objects directly
TomNicholas Apr 12, 2023
5dd9d35
Remove redundant Nones from types
TomNicholas Apr 17, 2023
5a46294
From future import annotations
TomNicholas Apr 17, 2023
d6b56c6
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 17, 2023
471d22a
From functools import annotations
TomNicholas Apr 17, 2023
8378f43
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 17, 2023
f8b1020
From future import annotations
TomNicholas Apr 17, 2023
907c15b
Merge branch 'main' into cubed_integration
TomNicholas Apr 17, 2023
11676ab
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 17, 2023
76ce09e
defined type for NormalizedChunks
TomNicholas May 1, 2023
127c184
Merge branch 'cubed_integration' of https:/TomNicholas/xa…
TomNicholas May 1, 2023
604bbf3
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 1, 2023
7604594
standardized capitalization of ChunkManagerEntrypoint
TomNicholas May 1, 2023
97537dd
Merge branch 'cubed_integration' of https:/TomNicholas/xa…
TomNicholas May 1, 2023
026fe17
Merge branch 'main' into cubed_integration
TomNicholas May 1, 2023
355555f
ensure ruff doesn't remove import
TomNicholas May 1, 2023
6eac87a
ignore remaining typing errors stemming from unclear dask typing for …
TomNicholas May 2, 2023
f4224f6
Merge branch 'main' into cubed_integration
TomNicholas May 2, 2023
4ec8370
rename store_kwargs->chunkmanager_store_kwargs
TomNicholas May 2, 2023
316c63d
missed return value
TomNicholas May 2, 2023
9cd9078
array API fixes for astype
TomNicholas May 4, 2023
5dc2016
Revert "array API fixes for astype"
TomNicholas May 4, 2023
995eb5a
Merge branch 'main' into cubed_integration
dcherian May 10, 2023
c8b9ee7
Apply suggestions from code review
Illviljan May 12, 2023
5c95758
Update xarray/tests/test_parallelcompat.py
Illviljan May 12, 2023
a61a30a
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 12, 2023
ea35a32
overridden -> subclassed
TomNicholas May 12, 2023
e68b327
from_array_kwargs is optional
TomNicholas May 12, 2023
956c055
ensured all compute calls go through chunkmanager
TomNicholas May 12, 2023
42ad08e
Merge branch 'cubed_integration' of https:/TomNicholas/xa…
TomNicholas May 12, 2023
cf0c28e
Raise if multiple chunkmanagers recognize array type
TomNicholas May 12, 2023
4f2ec27
from_array_kwargs is optional
TomNicholas May 13, 2023
5f2f569
from_array_kwargs is optional
TomNicholas May 13, 2023
929db33
from_array_kwargs is optional
TomNicholas May 13, 2023
876f81c
from_array_kwargs is optional
TomNicholas May 13, 2023
ad0a706
from_array_kwargs is optional
TomNicholas May 13, 2023
115b52b
fixes for chunk methods
TomNicholas May 13, 2023
8741eec
Merge branch 'main' into cubed_integration
TomNicholas May 13, 2023
a1ba4f0
Merge branch 'cubed_integration' of https:/TomNicholas/xa…
TomNicholas May 13, 2023
bdf7600
correct readme to reflect fact we aren't vendoring dask in this PR an…
TomNicholas May 13, 2023
06bb508
update whatsnew
TomNicholas May 13, 2023
ba00558
more docstring corrections
TomNicholas May 13, 2023
6a99454
remove comment
TomNicholas May 13, 2023
95d81e8
Raise NotImplementedErrors in all abstract methods
TomNicholas May 16, 2023
e5e3096
type hints for every arg in ChunkManagerEntryPOint methods
TomNicholas May 16, 2023
a221436
more explicit typing + fixes for mypy errors revealed
TomNicholas May 16, 2023
fe2e9b3
Keyword-only arguments in full_like etc.
TomNicholas May 16, 2023
7bcaece
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 16, 2023
fecf7ed
None as default instead of {}
TomNicholas May 16, 2023
15dc44b
Merge branch 'cubed_integration' of https:/TomNicholas/xa…
TomNicholas May 16, 2023
660ef41
fix bug apparently introduced by changing default type of drop_axis k…
TomNicholas May 16, 2023
e6d6f1f
Removed hopefully-unnecessary mypy ignore
TomNicholas May 16, 2023
c7fbe79
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 16, 2023
d728427
removed unnecessary mypy ignores
TomNicholas May 16, 2023
6b9fa3f
Merge branch 'cubed_integration' of https:/TomNicholas/xa…
TomNicholas May 16, 2023
51db5f2
change default value of drop_axis kwarg in map_blocks and catch when …
TomNicholas May 18, 2023
c69c563
fix checking of dask version in map_blocks
TomNicholas May 18, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ Documentation
Internal Changes
~~~~~~~~~~~~~~~~

- Experimental support for wrapping chunked array libraries other than dask.
A new ABC is defined - :py:class:`xr.core.parallelcompat.ChunkManagerEntrypoint` - which can be subclassed and then
registered by alternative chunked array implementations. (:issue:`6807`, :pull:`7019`)
By `Tom Nicholas <https:/TomNicholas>`_.


.. _whats-new.2023.04.2:

Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ module = [
"cf_units.*",
"cfgrib.*",
"cftime.*",
"cubed.*",
"cupy.*",
"fsspec.*",
"h5netcdf.*",
Expand Down
4 changes: 4 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ xarray =
static/css/*
static/html/*

[options.entry_points]
xarray.chunkmanagers =
dask = xarray.core.daskmanager:DaskManager

[tool:pytest]
python_files = test_*.py
testpaths = xarray/tests properties
Expand Down
85 changes: 73 additions & 12 deletions xarray/backends/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,16 @@
from glob import glob
from io import BytesIO
from numbers import Number
from typing import TYPE_CHECKING, Any, Callable, Final, Literal, Union, cast, overload
from typing import (
TYPE_CHECKING,
Any,
Callable,
Final,
Literal,
Union,
cast,
overload,
)

import numpy as np

Expand All @@ -20,9 +29,11 @@
_nested_combine,
combine_by_coords,
)
from xarray.core.daskmanager import DaskManager
from xarray.core.dataarray import DataArray
from xarray.core.dataset import Dataset, _get_chunk, _maybe_chunk
from xarray.core.indexes import Index
from xarray.core.parallelcompat import guess_chunkmanager
from xarray.core.utils import is_remote_uri

if TYPE_CHECKING:
Expand All @@ -38,6 +49,7 @@
CompatOptions,
JoinOptions,
NestedSequence,
T_Chunks,
)

T_NetcdfEngine = Literal["netcdf4", "scipy", "h5netcdf"]
Expand All @@ -48,7 +60,6 @@
str, # no nice typing support for custom backends
None,
]
T_Chunks = Union[int, dict[Any, Any], Literal["auto"], None]
T_NetcdfTypes = Literal[
"NETCDF4", "NETCDF4_CLASSIC", "NETCDF3_64BIT", "NETCDF3_CLASSIC"
]
Expand Down Expand Up @@ -297,17 +308,27 @@ def _chunk_ds(
chunks,
overwrite_encoded_chunks,
inline_array,
chunked_array_type,
from_array_kwargs,
**extra_tokens,
):
from dask.base import tokenize
chunkmanager = guess_chunkmanager(chunked_array_type)

# TODO refactor to move this dask-specific logic inside the DaskManager class
if isinstance(chunkmanager, DaskManager):
from dask.base import tokenize

mtime = _get_mtime(filename_or_obj)
token = tokenize(filename_or_obj, mtime, engine, chunks, **extra_tokens)
name_prefix = f"open_dataset-{token}"
mtime = _get_mtime(filename_or_obj)
token = tokenize(filename_or_obj, mtime, engine, chunks, **extra_tokens)
name_prefix = "open_dataset-"
else:
# not used
token = (None,)
name_prefix = None

variables = {}
for name, var in backend_ds.variables.items():
var_chunks = _get_chunk(var, chunks)
var_chunks = _get_chunk(var, chunks, chunkmanager)
variables[name] = _maybe_chunk(
name,
var,
Expand All @@ -316,6 +337,8 @@ def _chunk_ds(
name_prefix=name_prefix,
token=token,
inline_array=inline_array,
chunked_array_type=chunkmanager,
from_array_kwargs=from_array_kwargs.copy(),
)
return backend_ds._replace(variables)

Expand All @@ -328,6 +351,8 @@ def _dataset_from_backend_dataset(
cache,
overwrite_encoded_chunks,
inline_array,
chunked_array_type,
from_array_kwargs,
**extra_tokens,
):
if not isinstance(chunks, (int, dict)) and chunks not in {None, "auto"}:
Expand All @@ -346,6 +371,8 @@ def _dataset_from_backend_dataset(
chunks,
overwrite_encoded_chunks,
inline_array,
chunked_array_type,
from_array_kwargs,
**extra_tokens,
)

Expand Down Expand Up @@ -373,6 +400,8 @@ def open_dataset(
decode_coords: Literal["coordinates", "all"] | bool | None = None,
drop_variables: str | Iterable[str] | None = None,
inline_array: bool = False,
chunked_array_type: str | None = None,
from_array_kwargs: dict[str, Any] | None = None,
backend_kwargs: dict[str, Any] | None = None,
**kwargs,
) -> Dataset:
Expand Down Expand Up @@ -465,6 +494,15 @@ def open_dataset(
itself, and each chunk refers to that task by its key. With
``inline_array=True``, Dask will instead inline the array directly
in the values of the task graph. See :py:func:`dask.array.from_array`.
chunked_array_type: str, optional
Which chunked array type to coerce this datasets' arrays to.
Defaults to 'dask' if installed, else whatever is registered via the `ChunkManagerEnetryPoint` system.
Experimental API that should not be relied upon.
from_array_kwargs: dict
Additional keyword arguments passed on to the `ChunkManagerEntrypoint.from_array` method used to create
chunked arrays, via whichever chunk manager is specified through the `chunked_array_type` kwarg.
For example if :py:func:`dask.array.Array` objects are used for chunking, additional kwargs will be passed
to :py:func:`dask.array.from_array`. Experimental API that should not be relied upon.
backend_kwargs: dict
Additional keyword arguments passed on to the engine open function,
equivalent to `**kwargs`.
Expand Down Expand Up @@ -508,6 +546,9 @@ def open_dataset(
if engine is None:
engine = plugins.guess_engine(filename_or_obj)

if from_array_kwargs is None:
from_array_kwargs = {}

backend = plugins.get_backend(engine)

decoders = _resolve_decoders_kwargs(
Expand Down Expand Up @@ -536,6 +577,8 @@ def open_dataset(
cache,
overwrite_encoded_chunks,
inline_array,
chunked_array_type,
from_array_kwargs,
drop_variables=drop_variables,
**decoders,
**kwargs,
Expand All @@ -546,8 +589,8 @@ def open_dataset(
def open_dataarray(
filename_or_obj: str | os.PathLike[Any] | BufferedIOBase | AbstractDataStore,
*,
engine: T_Engine = None,
chunks: T_Chunks = None,
engine: T_Engine | None = None,
chunks: T_Chunks | None = None,
cache: bool | None = None,
decode_cf: bool | None = None,
mask_and_scale: bool | None = None,
Expand All @@ -558,6 +601,8 @@ def open_dataarray(
decode_coords: Literal["coordinates", "all"] | bool | None = None,
drop_variables: str | Iterable[str] | None = None,
inline_array: bool = False,
chunked_array_type: str | None = None,
from_array_kwargs: dict[str, Any] | None = None,
backend_kwargs: dict[str, Any] | None = None,
**kwargs,
) -> DataArray:
Expand Down Expand Up @@ -652,6 +697,15 @@ def open_dataarray(
itself, and each chunk refers to that task by its key. With
``inline_array=True``, Dask will instead inline the array directly
in the values of the task graph. See :py:func:`dask.array.from_array`.
chunked_array_type: str, optional
Which chunked array type to coerce the underlying data array to.
Defaults to 'dask' if installed, else whatever is registered via the `ChunkManagerEnetryPoint` system.
Experimental API that should not be relied upon.
from_array_kwargs: dict
Additional keyword arguments passed on to the `ChunkManagerEntrypoint.from_array` method used to create
chunked arrays, via whichever chunk manager is specified through the `chunked_array_type` kwarg.
For example if :py:func:`dask.array.Array` objects are used for chunking, additional kwargs will be passed
to :py:func:`dask.array.from_array`. Experimental API that should not be relied upon.
backend_kwargs: dict
Additional keyword arguments passed on to the engine open function,
equivalent to `**kwargs`.
Expand Down Expand Up @@ -695,6 +749,8 @@ def open_dataarray(
cache=cache,
drop_variables=drop_variables,
inline_array=inline_array,
chunked_array_type=chunked_array_type,
from_array_kwargs=from_array_kwargs,
backend_kwargs=backend_kwargs,
use_cftime=use_cftime,
decode_timedelta=decode_timedelta,
Expand Down Expand Up @@ -726,7 +782,7 @@ def open_dataarray(

def open_mfdataset(
paths: str | NestedSequence[str | os.PathLike],
chunks: T_Chunks = None,
chunks: T_Chunks | None = None,
concat_dim: str
| DataArray
| Index
Expand All @@ -736,7 +792,7 @@ def open_mfdataset(
| None = None,
compat: CompatOptions = "no_conflicts",
preprocess: Callable[[Dataset], Dataset] | None = None,
engine: T_Engine = None,
engine: T_Engine | None = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add from_array_kwargs here too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually don't think we need to - from_array_kwargs is only going to get directly passed down to open_dataset, and hence could be considered part of **kwargs.

This should actually just work, except in the case of parallel=True. For that we could add delayed to the ChunkManager ABC, so that if cubed does implement cubed.delayed it could be added, else a NotImplementedError would be raised. I think all of this wouldn't be necessary if we had lazy concatenation in xarray though (xref #4628). That suggestion would mean we should also replace other instances of dask.delayed in other parts of the codebase though... I think I will split this into a separate issue in the interests of getting this one merged.

data_vars: Literal["all", "minimal", "different"] | list[str] = "all",
coords="different",
combine: Literal["by_coords", "nested"] = "by_coords",
Expand Down Expand Up @@ -1490,6 +1546,7 @@ def to_zarr(
safe_chunks: bool = True,
storage_options: dict[str, str] | None = None,
zarr_version: int | None = None,
chunkmanager_store_kwargs: dict[str, Any] | None = None,
) -> backends.ZarrStore:
...

Expand All @@ -1512,6 +1569,7 @@ def to_zarr(
safe_chunks: bool = True,
storage_options: dict[str, str] | None = None,
zarr_version: int | None = None,
chunkmanager_store_kwargs: dict[str, Any] | None = None,
) -> Delayed:
...

Expand All @@ -1531,6 +1589,7 @@ def to_zarr(
safe_chunks: bool = True,
storage_options: dict[str, str] | None = None,
zarr_version: int | None = None,
chunkmanager_store_kwargs: dict[str, Any] | None = None,
) -> backends.ZarrStore | Delayed:
"""This function creates an appropriate datastore for writing a dataset to
a zarr ztore
Expand Down Expand Up @@ -1652,7 +1711,9 @@ def to_zarr(
writer = ArrayWriter()
# TODO: figure out how to properly handle unlimited_dims
dump_to_store(dataset, zstore, writer, encoding=encoding)
writes = writer.sync(compute=compute)
writes = writer.sync(
compute=compute, chunkmanager_store_kwargs=chunkmanager_store_kwargs
)

if compute:
_finalize_store(writes, zstore)
Expand Down
15 changes: 10 additions & 5 deletions xarray/backends/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@

from xarray.conventions import cf_encoder
from xarray.core import indexing
from xarray.core.pycompat import is_duck_dask_array
from xarray.core.parallelcompat import get_chunked_array_type
from xarray.core.pycompat import is_chunked_array
from xarray.core.utils import FrozenDict, NdimSizeLenMixin, is_remote_uri

if TYPE_CHECKING:
Expand Down Expand Up @@ -153,7 +154,7 @@ def __init__(self, lock=None):
self.lock = lock

def add(self, source, target, region=None):
if is_duck_dask_array(source):
if is_chunked_array(source):
self.sources.append(source)
self.targets.append(target)
self.regions.append(region)
Expand All @@ -163,21 +164,25 @@ def add(self, source, target, region=None):
else:
target[...] = source

def sync(self, compute=True):
def sync(self, compute=True, chunkmanager_store_kwargs=None):
if self.sources:
import dask.array as da
chunkmanager = get_chunked_array_type(*self.sources)

# TODO: consider wrapping targets with dask.delayed, if this makes
# for any discernible difference in perforance, e.g.,
# targets = [dask.delayed(t) for t in self.targets]

delayed_store = da.store(
if chunkmanager_store_kwargs is None:
chunkmanager_store_kwargs = {}

delayed_store = chunkmanager.store(
self.sources,
self.targets,
lock=self.lock,
compute=compute,
flush=True,
regions=self.regions,
**chunkmanager_store_kwargs,
)
self.sources = []
self.targets = []
Expand Down
2 changes: 1 addition & 1 deletion xarray/backends/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def refresh_engines() -> None:

def guess_engine(
store_spec: str | os.PathLike[Any] | BufferedIOBase | AbstractDataStore,
):
) -> str | type[BackendEntrypoint]:
engines = list_engines()

for engine, backend in engines.items():
Expand Down
23 changes: 21 additions & 2 deletions xarray/backends/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
)
from xarray.backends.store import StoreBackendEntrypoint
from xarray.core import indexing
from xarray.core.parallelcompat import guess_chunkmanager
from xarray.core.pycompat import integer_types
from xarray.core.utils import (
FrozenDict,
Expand Down Expand Up @@ -716,6 +717,8 @@ def open_zarr(
decode_timedelta=None,
use_cftime=None,
zarr_version=None,
chunked_array_type: str | None = None,
from_array_kwargs: dict[str, Any] | None = None,
**kwargs,
):
"""Load and decode a dataset from a Zarr store.
Expand Down Expand Up @@ -800,6 +803,15 @@ def open_zarr(
The desired zarr spec version to target (currently 2 or 3). The default
of None will attempt to determine the zarr version from ``store`` when
possible, otherwise defaulting to 2.
chunked_array_type: str, optional
Which chunked array type to coerce this datasets' arrays to.
Defaults to 'dask' if installed, else whatever is registered via the `ChunkManagerEnetryPoint` system.
Experimental API that should not be relied upon.
from_array_kwargs: dict, optional
Additional keyword arguments passed on to the `ChunkManagerEntrypoint.from_array` method used to create
chunked arrays, via whichever chunk manager is specified through the `chunked_array_type` kwarg.
Defaults to {'manager': 'dask'}, meaning additional kwargs will be passed eventually to
:py:func:`dask.array.from_array`. Experimental API that should not be relied upon.

Returns
-------
Expand All @@ -817,12 +829,17 @@ def open_zarr(
"""
from xarray.backends.api import open_dataset

if from_array_kwargs is None:
from_array_kwargs = {}

if chunks == "auto":
try:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we handle the import error in guess_chunkmanager by returning None? In the future, this can become an explicit error

import dask.array # noqa
guess_chunkmanager(
chunked_array_type
) # attempt to import that parallel backend

chunks = {}
except ImportError:
except ValueError:
chunks = None

if kwargs:
Expand Down Expand Up @@ -851,6 +868,8 @@ def open_zarr(
engine="zarr",
chunks=chunks,
drop_variables=drop_variables,
chunked_array_type=chunked_array_type,
from_array_kwargs=from_array_kwargs,
backend_kwargs=backend_kwargs,
decode_timedelta=decode_timedelta,
use_cftime=use_cftime,
Expand Down
Loading