From 78136a452736a6eada7221ff6e8faf4d7dd20a27 Mon Sep 17 00:00:00 2001 From: Damien Ayers Date: Wed, 9 Nov 2022 10:56:25 +1100 Subject: [PATCH 1/2] Refactor and support changing S3 Endpoint See #470 --- .pre-commit-config.yaml | 14 +- apps/cloud/odc/apps/cloud/s3_find.py | 7 +- apps/dc_tools/odc/apps/dc_tools/s3_to_dc.py | 8 +- libs/cloud/odc/aio/__init__.py | 599 +------------------- libs/cloud/odc/aio/_impl.py | 589 +++++++++++++++++++ libs/cloud/odc/azure/__init__.py | 84 +-- libs/cloud/odc/azure/_impl.py | 84 +++ 7 files changed, 716 insertions(+), 669 deletions(-) create mode 100644 libs/cloud/odc/aio/_impl.py create mode 100644 libs/cloud/odc/azure/_impl.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index aa8bc4b94..bdac34d31 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -16,6 +16,16 @@ repos: - id: requirements-txt-fixer - id: check-added-large-files - id: check-merge-conflict + - repo: https://github.com/pycqa/isort + rev: 5.10.1 + hooks: + - id: isort + name: isort (python) + args: [ "--profile", "black", "--filter-files" ] + - repo: https://github.com/psf/black + rev: 22.10.0 + hooks: + - id: black - repo: https://github.com/pre-commit/mirrors-pylint rev: 'v3.0.0a5' # Use the sha / tag you want to point at hooks: @@ -24,7 +34,3 @@ repos: rev: '5.0.4' hooks: - id: flake8 - - repo: https://github.com/psf/black - rev: 22.10.0 - hooks: - - id: black diff --git a/apps/cloud/odc/apps/cloud/s3_find.py b/apps/cloud/odc/apps/cloud/s3_find.py index baae47f17..70873efe5 100755 --- a/apps/cloud/odc/apps/cloud/s3_find.py +++ b/apps/cloud/odc/apps/cloud/s3_find.py @@ -1,6 +1,7 @@ -import click import sys -from odc.aio import s3_find_glob, S3Fetcher + +import click +from odc.aio import S3Fetcher, s3_find_glob @click.command("s3-find") @@ -56,7 +57,7 @@ def cli(uri, skip_check, no_sign_request=None, request_payer=False): except ValueError as ve: click.echo(str(ve), err=True) sys.exit(1) - except Exception as e: + except Exception as e: #pylint:disable=broad-except click.echo(str(e), err=True) sys.exit(1) diff --git a/apps/dc_tools/odc/apps/dc_tools/s3_to_dc.py b/apps/dc_tools/odc/apps/dc_tools/s3_to_dc.py index 207fd7623..936c91a21 100755 --- a/apps/dc_tools/odc/apps/dc_tools/s3_to_dc.py +++ b/apps/dc_tools/odc/apps/dc_tools/s3_to_dc.py @@ -9,14 +9,14 @@ import click from datacube import Datacube from datacube.index.hl import Doc2Dataset - - from odc.aio import S3Fetcher, s3_find_glob +from odc.apps.dc_tools._docs import parse_doc_stream +from odc.apps.dc_tools._stac import stac_transform, stac_transform_absolute from odc.apps.dc_tools.utils import ( IndexingException, SkippedException, - archive_less_mature, allow_unsafe, + archive_less_mature, fail_on_missing_lineage, index_update_dataset, no_sign_request, @@ -31,8 +31,6 @@ update_if_exists, verify_lineage, ) -from odc.apps.dc_tools._docs import parse_doc_stream -from odc.apps.dc_tools._stac import stac_transform, stac_transform_absolute # Grab the URL from the resulting S3 item diff --git a/libs/cloud/odc/aio/__init__.py b/libs/cloud/odc/aio/__init__.py index 3191fce85..990099024 100644 --- a/libs/cloud/odc/aio/__init__.py +++ b/libs/cloud/odc/aio/__init__.py @@ -1,580 +1,27 @@ -from ..cloud._version import __version__ -import logging - -import botocore -import aiobotocore -from aiobotocore.session import get_session -from aiobotocore.config import AioConfig -from botocore.exceptions import ClientError, BotoCoreError -import asyncio -from fnmatch import fnmatch -from types import SimpleNamespace -from typing import Optional, Iterator, Any - -from odc.aws import ( +from odc.cloud._version import __version__ +from ._impl import ( + S3Fetcher, auto_find_region, + s3_dir, + s3_dir_dir, + s3_file_info, + s3_find, + s3_find_glob, + s3_head_object, s3_url_parse, - s3_fmt_range, - _aws_unsigned_check_env, + s3_walker, ) -from odc.aws._find import norm_predicate, s3_file_info, parse_query -from odc.ppt import EOS_MARKER, future_results -from odc.ppt.async_thread import AsyncThread - - -async def s3_fetch_object(url, s3, range=None, **kw): - """returns object with - - On success: - .url = url - .data = bytes - .last_modified -- last modified timestamp - .range = None | (in,out) - .error = None - - On failure: - .url = url - .data = None - .last_modified = None - .range = None | (in, out) - .error = str| botocore.Exception class - """ - - def result(data=None, last_modified=None, error=None): - return SimpleNamespace( - url=url, data=data, error=error, last_modified=last_modified, range=range - ) - - bucket, key = s3_url_parse(url) - extra_args = dict(**kw) - - if range is not None: - try: - extra_args["Range"] = s3_fmt_range(range) - except Exception: - return result(error="Bad range passed in: " + str(range)) - - try: - obj = await s3.get_object(Bucket=bucket, Key=key, **extra_args) - stream = obj.get("Body", None) - if stream is None: - return result(error="Missing Body in response") - async with stream: - data = await stream.read() - except (ClientError, BotoCoreError) as e: - return result(error=e) - except Exception as e: - return result(error="Some Error: " + str(e)) - - last_modified = obj.get("LastModified", None) - return result(data=data, last_modified=last_modified) - - -async def _s3_find_via_cbk(url, cbk, s3, pred=None, glob=None, **kw): - """List all objects under certain path - - each s3 object is represented by a SimpleNamespace with attributes: - - url - - size - - last_modified - - etag - """ - pred = norm_predicate(pred=pred, glob=glob) - - bucket, prefix = s3_url_parse(url) - - if len(prefix) > 0 and not prefix.endswith("/"): - prefix = prefix + "/" - - pp = s3.get_paginator("list_objects_v2") - - n_total, n = 0, 0 - - async for o in pp.paginate(Bucket=bucket, Prefix=prefix, **kw): - for f in o.get("Contents", []): - n_total += 1 - f = s3_file_info(f, bucket) - if pred is None or pred(f): - n += 1 - await cbk(f) - - return n_total, n - - -async def s3_find(url, s3, pred=None, glob=None, **kw): - """List all objects under certain path - - each s3 object is represented by a SimpleNamespace with attributes: - - url - - size - - last_modified - - etag - """ - _files = [] - - async def on_file(f): - _files.append(f) - - await _s3_find_via_cbk(url, on_file, s3=s3, pred=pred, glob=glob, **kw) - - return _files - - -async def s3_head_object(url, s3, **kw): - """Run head_object return Result or Error - - (Result, None) -- on success - (None, error) -- on failure - - """ - - def unpack(url, rr): - return SimpleNamespace( - url=url, - size=rr.get("ContentLength", 0), - etag=rr.get("ETag", ""), - last_modified=rr.get("LastModified"), - expiration=rr.get("Expiration"), - ) - - bucket, key = s3_url_parse(url) - try: - rr = await s3.head_object(Bucket=bucket, Key=key, **kw) - except (ClientError, BotoCoreError) as e: - return (None, e) - - return (unpack(url, rr), None) - - -async def s3_dir(url, s3, pred=None, glob=None, **kw): - """List s3 "directory" without descending into sub directories. - - pred: predicate for file objects file_info -> True|False - glob: glob pattern for files only - - Returns: (dirs, files) - - where - dirs -- list of subdirectories in `s3://bucket/path/` format - - files -- list of objects with attributes: url, size, last_modified, etag - """ - bucket, prefix = s3_url_parse(url) - pred = norm_predicate(pred=pred, glob=glob) - - if len(prefix) > 0 and not prefix.endswith("/"): - prefix = prefix + "/" - - pp = s3.get_paginator("list_objects_v2") - - _dirs = [] - _files = [] - - async for o in pp.paginate(Bucket=bucket, Prefix=prefix, Delimiter="/", **kw): - for d in o.get("CommonPrefixes", []): - d = d.get("Prefix") - _dirs.append("s3://{}/{}".format(bucket, d)) - for f in o.get("Contents", []): - f = s3_file_info(f, bucket) - if pred is None or pred(f): - _files.append(f) - - return _dirs, _files - - -async def s3_dir_dir(url, depth, dst_q, s3, pred=None, **kw): - """Find directories certain depth from the base, push them to the `dst_q` - - ``` - s3://bucket/a - |- b1 - |- c1/... - |- c2/... - |- some_file.txt - |- b2 - |- c3/... - ``` - - Given a bucket structure above, calling this function with - - - url s3://bucket/a/ - - depth=1 will produce - - s3://bucket/a/b1/ - - s3://bucket/a/b2/ - - depth=2 will produce - - s3://bucket/a/b1/c1/ - - s3://bucket/a/b1/c2/ - - s3://bucket/a/b2/c3/ - - Any files are ignored. - - If `pred` is supplied it is expected to be a `str -> bool` mapping, on - input full path of the sub-directory is given (e.g `a/b1/`) starting from - root, but not including bucket name. Sub-directory is only traversed - further if predicate returns True. - """ - if not url.endswith("/"): - url = url + "/" - - if depth == 0: - await dst_q.put(url) - return - - pp = s3.get_paginator("list_objects_v2") - - async def step(bucket, prefix, depth, work_q, dst_q): - - async for o in pp.paginate(Bucket=bucket, Prefix=prefix, Delimiter="/", **kw): - for d in o.get("CommonPrefixes", []): - d = d.get("Prefix") - if pred is not None and not pred(d): - continue - - if depth > 1: - await work_q.put((d, depth - 1)) - else: - d = "s3://{}/{}".format(bucket, d) - await dst_q.put(d) - - bucket, prefix = s3_url_parse(url) - work_q = asyncio.LifoQueue() - work_q.put_nowait((prefix, depth)) - - while work_q.qsize() > 0: - _dir, depth = work_q.get_nowait() - await step(bucket, _dir, depth, work_q, dst_q) - - -async def s3_walker(url, nconcurrent, s3, guide=None, pred=None, glob=None, **kw): - """ - - guide(url, depth, base) -> 'dir'|'skip'|'deep' - """ - - def default_guide(url, depth, base): - return "dir" - - if guide is None: - guide = default_guide - - work_q = asyncio.Queue() - n_active = 0 - - async def step(idx): - nonlocal n_active - - x = await work_q.get() - if x is EOS_MARKER: - return EOS_MARKER - - url, depth, action = x - depth = depth + 1 - - n_active += 1 - - _files = [] - if action == "dir": - _dirs, _files = await s3_dir(url, s3=s3, pred=pred, glob=glob, **kw) - - for d in _dirs: - action = guide(d, depth=depth, base=url) - - if action != "skip": - if action not in ("dir", "deep"): - raise ValueError("Expect skip|dir|deep got: %s" % action) - - work_q.put_nowait((d, depth, action)) - - elif action == "deep": - _files = await s3_find(url, s3=s3, pred=pred, glob=glob) - else: - raise RuntimeError( - "Expected action to be one of deep|dir but found %s" % action - ) - - n_active -= 1 - - # Work queue was already empty and we didn't add any more to traverse - # and no out-standing work is running - if work_q.empty() and n_active == 0: - # Tell all workers in the swarm to stop - for _ in range(nconcurrent): - work_q.put_nowait(EOS_MARKER) - - return _files - - work_q.put_nowait((url, 0, "dir")) - - return step - - -class S3Fetcher(object): - def __init__( - self, - nconcurrent=24, - region_name=None, - addressing_style="path", - aws_unsigned=None, - ): - - self._closed = True - if region_name is None: - region_name = auto_find_region() - - opts = {} - if aws_unsigned is None: - aws_unsigned = _aws_unsigned_check_env() - - if aws_unsigned: - opts["signature_version"] = botocore.UNSIGNED - - s3_cfg = AioConfig( - max_pool_connections=nconcurrent, - **opts, - s3=dict(addressing_style=addressing_style), - ) - - self._nconcurrent = nconcurrent - self._async = AsyncThread() - self._s3 = None - self._s3_ctx = None - self._session = None - - async def setup(s3_cfg): - session = get_session() - s3_ctx = session.create_client("s3", region_name=region_name, config=s3_cfg) - s3 = await s3_ctx.__aenter__() - return (session, s3, s3_ctx) - - session, s3, s3_ctx = self._async.submit(setup, s3_cfg).result() - self._closed = False - self._session = session - self._s3 = s3 - self._s3_ctx = s3_ctx - - def close(self): - async def _close(s3): - await s3.close() - - if not self._closed: - self._async.submit(_close, self._s3).result() - self._async.terminate() - self._closed = True - - def __del__(self): - self.close() - - def list_dir(self, url, **kw): - """Returns a future object""" - - async def action(url, s3, **kw): - return await s3_dir(url, s3=s3, **kw) - - return self._async.submit(action, url, self._s3, **kw) - - def find_all(self, url, pred=None, glob=None, **kw): - """List all objects under certain path - - Returns a future object that resolves to a list of s3 object metadata - - each s3 object is represented by a SimpleNamespace with attributes: - - url - - size - - last_modified - - etag - """ - if glob is None and isinstance(pred, str): - pred, glob = None, pred - - async def action(url, s3, **kw): - return await s3_find(url, s3=s3, pred=pred, glob=glob, **kw) - - return self._async.submit(action, url, self._s3, **kw) - - def find(self, url, pred=None, glob=None, **kw): - """List all objects under certain path - - Returns an iterator of s3 object metadata - - each s3 object is represented by a SimpleNamespace with attributes: - - url - - size - - last_modified - - etag - """ - if glob is None and isinstance(pred, str): - pred, glob = None, pred - - async def find_to_queue(url, s3, q, **kw): - async def on_file(x): - await q.put(x) - - try: - await _s3_find_via_cbk(url, on_file, s3=s3, pred=pred, glob=glob, **kw) - except Exception: - return False - finally: - await q.put(EOS_MARKER) - return True - - q = asyncio.Queue(1000) - ff = self._async.submit(find_to_queue, url, self._s3, q, **kw) - clean_exit = False - raise_error = False - - try: - yield from self._async.from_queue(q) - raise_error = not ff.result() - clean_exit = True - finally: - if not clean_exit: - ff.cancel() - if raise_error: - raise IOError(f"Failed to list: {url}") - - def dir_dir(self, url, depth, pred=None, **kw): - async def action(q, s3, **kw): - try: - await s3_dir_dir(url, depth, q, s3, pred=pred, **kw) - finally: - await q.put(EOS_MARKER) - - q = asyncio.Queue(1000) - ff = self._async.submit(action, q, self._s3, **kw) - clean_exit = False - - try: - yield from self._async.from_queue(q) - ff.result() - clean_exit = True - finally: - if not clean_exit: - ff.cancel() - - def head_object(self, url, **kw): - return self._async.submit(s3_head_object, url, s3=self._s3, **kw) - - def fetch(self, url, range=None, **kw): - """Returns a future object""" - return self._async.submit(s3_fetch_object, url, s3=self._s3, range=range, **kw) - - def __call__(self, urls, **kw): - """Fetch a bunch of s3 urls concurrently. - - urls -- sequence of , where range is (in:int,out:int)|None - - On output is a sequence of result objects, note that order is not - preserved, but one should get one result for every input. - - Successful results object will contain: - .url = url - .data = bytes - .last_modified -- last modified timestamp - .range = None | (in,out) - .error = None - - Failed result looks like this: - .url = url - .data = None - .last_modified = None - .range = None | (in, out) - .error = str| botocore.Exception class - - """ - - def generate_requests(urls, s3, **kw): - for url in urls: - if isinstance(url, tuple): - url, range = url - else: - range = None - - yield self._async.submit(s3_fetch_object, url, s3=s3, range=range, **kw) - - for rr, ee in future_results( - generate_requests(urls, self._s3, **kw), self._nconcurrent * 2 - ): - if ee is not None: - assert not "s3_fetch_object should not raise exceptions, but did" - else: - yield rr - - -def s3_find_glob( - glob_pattern: str, skip_check: bool = False, s3: Optional[S3Fetcher] = None, **kw -) -> Iterator[Any]: - """ - Build generator from supplied S3 URI glob pattern - - Arguments: - glob_pattern {str} -- Glob pattern to filter S3 Keys by - skip_check {bool} -- Skip validity check for S3 Key - Raises: - ve: ValueError if the glob pattern cannot be parsed - """ - if s3 is None: - s3 = S3Fetcher() - - def do_file_query(qq, pred, dirs_pred=None): - for d in s3.dir_dir(qq.base, qq.depth, pred=dirs_pred, **kw): - _, _files = s3.list_dir(d, **kw).result() - for f in _files: - if pred(f): - yield f - - def do_file_query2(qq, dirs_pred=None): - fname = qq.file - - stream = s3.dir_dir(qq.base, qq.depth, pred=dirs_pred, **kw) - - if skip_check: - yield from (SimpleNamespace(url=d + fname) for d in stream) - return - - stream = (s3.head_object(d + fname, **kw) for d in stream) - - for (f, _), _ in future_results(stream, 32): - if f is not None: - yield f - - def do_dir_query(qq, dirs_pred=None): - return ( - SimpleNamespace(url=url) - for url in s3.dir_dir(qq.base, qq.depth, pred=dirs_pred, **kw) - ) - - try: - qq = parse_query(glob_pattern) - except ValueError as ve: - logging.error(f"URI glob-pattern not understood : {ve}") - raise ve - - glob_or_file = qq.glob or qq.file - - if qq.depth is None and glob_or_file is None: - stream = s3.find(qq.base, **kw) - elif qq.depth is None or qq.depth < 0: - if qq.glob: - stream = s3.find(qq.base, glob=qq.glob, **kw) - elif qq.file: - postfix = "/" + qq.file - stream = s3.find(qq.base, pred=lambda o: o.url.endswith(postfix), **kw) - else: - # fixed depth query - _, prefix = s3_url_parse(glob_pattern) - dirs_glob = prefix.split("/")[:-1] - - def dirs_pred(f): - n = f.count("/") - _glob = "/".join(dirs_glob[:n]) + "/" - return fnmatch(f, _glob) - - if qq.glob is not None: - pred = norm_predicate(glob=qq.glob) - stream = do_file_query(qq, pred, dirs_pred=dirs_pred) - elif qq.file is not None: - stream = do_file_query2(qq, dirs_pred=dirs_pred) - else: - stream = do_dir_query(qq, dirs_pred=dirs_pred) - return stream +__all__ = [ + "__version__", + "s3_dir", + "s3_dir_dir", + "s3_find", + "s3_find_glob", + "s3_head_object", + "S3Fetcher", + "s3_walker", + "s3_url_parse", + "s3_file_info", + "auto_find_region", +] diff --git a/libs/cloud/odc/aio/_impl.py b/libs/cloud/odc/aio/_impl.py new file mode 100644 index 000000000..a2422a18f --- /dev/null +++ b/libs/cloud/odc/aio/_impl.py @@ -0,0 +1,589 @@ +import asyncio +import logging +import os +from fnmatch import fnmatch +from types import SimpleNamespace +from typing import Any, Iterator, Optional + +import botocore +from aiobotocore.config import AioConfig +from aiobotocore.session import get_session +from botocore.exceptions import BotoCoreError, ClientError +from odc.aws import ( + _aws_unsigned_check_env, + auto_find_region, + norm_predicate, + s3_file_info, + s3_fmt_range, + s3_url_parse, +) +from odc.aws._find import parse_query +from odc.ppt import EOS_MARKER, future_results +from odc.ppt.async_thread import AsyncThread + + +async def _s3_fetch_object(url, s3, _range=None, **kw): + """returns object with + + On success: + .url = url + .data = bytes + .last_modified -- last modified timestamp + .range = None | (in,out) + .error = None + + On failure: + .url = url + .data = None + .last_modified = None + .range = None | (in, out) + .error = str| botocore.Exception class + """ + + def result(data=None, last_modified=None, error=None): + return SimpleNamespace( + url=url, data=data, error=error, last_modified=last_modified, range=_range + ) + + bucket, key = s3_url_parse(url) + extra_args = dict(**kw) + + if _range is not None: + try: + extra_args["Range"] = s3_fmt_range(_range) + except Exception: # pylint:disable=broad-except + return result(error="Bad range passed in: " + str(_range)) + + try: + obj = await s3.get_object(Bucket=bucket, Key=key, **extra_args) + stream = obj.get("Body", None) + if stream is None: + return result(error="Missing Body in response") + async with stream: + data = await stream.read() + except (ClientError, BotoCoreError) as e: + return result(error=e) + except Exception as e: # pylint:disable=broad-except + return result(error="Some Error: " + str(e)) + + last_modified = obj.get("LastModified", None) + return result(data=data, last_modified=last_modified) + + +async def _s3_find_via_cbk(url, cbk, s3, pred=None, glob=None, **kw): + """List all objects under certain path + + each s3 object is represented by a SimpleNamespace with attributes: + - url + - size + - last_modified + - etag + """ + pred = norm_predicate(pred=pred, glob=glob) + + bucket, prefix = s3_url_parse(url) + + if len(prefix) > 0 and not prefix.endswith("/"): + prefix = prefix + "/" + + pp = s3.get_paginator("list_objects_v2") + + n_total, n = 0, 0 + + async for o in pp.paginate(Bucket=bucket, Prefix=prefix, **kw): + for f in o.get("Contents", []): + n_total += 1 + f = s3_file_info(f, bucket) + if pred is None or pred(f): + n += 1 + await cbk(f) + + return n_total, n + + +async def s3_find(url, s3, pred=None, glob=None, **kw): + """List all objects under certain path + + each s3 object is represented by a SimpleNamespace with attributes: + - url + - size + - last_modified + - etag + """ + _files = [] + + async def on_file(f): + _files.append(f) + + await _s3_find_via_cbk(url, on_file, s3=s3, pred=pred, glob=glob, **kw) + + return _files + + +async def s3_head_object(url, s3, **kw): + """Run head_object return Result or Error + + (Result, None) -- on success + (None, error) -- on failure + + """ + + def unpack(url, rr): + return SimpleNamespace( + url=url, + size=rr.get("ContentLength", 0), + etag=rr.get("ETag", ""), + last_modified=rr.get("LastModified"), + expiration=rr.get("Expiration"), + ) + + bucket, key = s3_url_parse(url) + try: + rr = await s3.head_object(Bucket=bucket, Key=key, **kw) + except (ClientError, BotoCoreError) as e: + return (None, e) + + return (unpack(url, rr), None) + + +async def s3_dir(url, s3, pred=None, glob=None, **kw): + """List s3 "directory" without descending into sub directories. + + pred: predicate for file objects file_info -> True|False + glob: glob pattern for files only + + Returns: (dirs, files) + + where + dirs -- list of subdirectories in `s3://bucket/path/` format + + files -- list of objects with attributes: url, size, last_modified, etag + """ + bucket, prefix = s3_url_parse(url) + pred = norm_predicate(pred=pred, glob=glob) + + if len(prefix) > 0 and not prefix.endswith("/"): + prefix = prefix + "/" + + pp = s3.get_paginator("list_objects_v2") + + _dirs = [] + _files = [] + + async for o in pp.paginate(Bucket=bucket, Prefix=prefix, Delimiter="/", **kw): + for d in o.get("CommonPrefixes", []): + d = d.get("Prefix") + _dirs.append(f"s3://{bucket}/{d}") + for f in o.get("Contents", []): + f = s3_file_info(f, bucket) + if pred is None or pred(f): + _files.append(f) + + return _dirs, _files + + +async def s3_dir_dir(url, depth, dst_q, s3, pred=None, **kw): + """Find directories certain depth from the base, push them to the `dst_q` + + ``` + s3://bucket/a + |- b1 + |- c1/... + |- c2/... + |- some_file.txt + |- b2 + |- c3/... + ``` + + Given a bucket structure above, calling this function with + + - url s3://bucket/a/ + - depth=1 will produce + - s3://bucket/a/b1/ + - s3://bucket/a/b2/ + - depth=2 will produce + - s3://bucket/a/b1/c1/ + - s3://bucket/a/b1/c2/ + - s3://bucket/a/b2/c3/ + + Any files are ignored. + + If `pred` is supplied it is expected to be a `str -> bool` mapping, on + input full path of the sub-directory is given (e.g `a/b1/`) starting from + root, but not including bucket name. Sub-directory is only traversed + further if predicate returns True. + """ + if not url.endswith("/"): + url = url + "/" + + if depth == 0: + await dst_q.put(url) + return + + pp = s3.get_paginator("list_objects_v2") + + async def step(bucket, prefix, depth, work_q, dst_q): + + async for o in pp.paginate(Bucket=bucket, Prefix=prefix, Delimiter="/", **kw): + for d in o.get("CommonPrefixes", []): + d = d.get("Prefix") + if pred is not None and not pred(d): + continue + + if depth > 1: + await work_q.put((d, depth - 1)) + else: + d = f"s3://{bucket}/{d}" + await dst_q.put(d) + + bucket, prefix = s3_url_parse(url) + work_q = asyncio.LifoQueue() + work_q.put_nowait((prefix, depth)) + + while work_q.qsize() > 0: + _dir, depth = work_q.get_nowait() + await step(bucket, _dir, depth, work_q, dst_q) + + +async def s3_walker(url, nconcurrent, s3, guide=None, pred=None, glob=None, **kw): + """ + + guide(url, depth, base) -> 'dir'|'skip'|'deep' + """ + + def default_guide(url, depth, base): + return "dir" + + if guide is None: + guide = default_guide + + work_q = asyncio.Queue() + n_active = 0 + + async def step(idx): + nonlocal n_active + + x = await work_q.get() + if x is EOS_MARKER: + return EOS_MARKER + + url, depth, action = x + depth = depth + 1 + + n_active += 1 + + _files = [] + if action == "dir": + _dirs, _files = await s3_dir(url, s3=s3, pred=pred, glob=glob, **kw) + + for d in _dirs: + action = guide(d, depth=depth, base=url) + + if action != "skip": + if action not in ("dir", "deep"): + raise ValueError(f"Expect skip|dir|deep got: {action}") + + work_q.put_nowait((d, depth, action)) + + elif action == "deep": + _files = await s3_find(url, s3=s3, pred=pred, glob=glob) + else: + raise RuntimeError( + f"Expected action to be one of deep|dir but found {action}" + ) + + n_active -= 1 + + # Work queue was already empty and we didn't add any more to traverse + # and no out-standing work is running + if work_q.empty() and n_active == 0: + # Tell all workers in the swarm to stop + for _ in range(nconcurrent): + work_q.put_nowait(EOS_MARKER) + + return _files + + work_q.put_nowait((url, 0, "dir")) + + return step + + +class S3Fetcher: + def __init__( + self, + nconcurrent=24, + region_name=None, + addressing_style="path", + aws_unsigned=None, + ): + + self._closed = True + if region_name is None: + region_name = auto_find_region() + + opts = {} + if aws_unsigned is None: + aws_unsigned = _aws_unsigned_check_env() + + if aws_unsigned: + opts["signature_version"] = botocore.UNSIGNED + + s3_cfg = AioConfig( + max_pool_connections=nconcurrent, + **opts, + s3=dict(addressing_style=addressing_style), + ) + + self._nconcurrent = nconcurrent + self._async = AsyncThread() + self._s3 = None + self._s3_ctx = None + self._session = None + + async def setup(s3_cfg): + session = get_session() + s3_ctx = session.create_client( + "s3", + region_name=region_name, + config=s3_cfg, + endpoint_url=os.environ.get("AWS_S3_ENDPOINT"), + ) + s3 = await s3_ctx.__aenter__() + return session, s3, s3_ctx + + session, s3, s3_ctx = self._async.submit(setup, s3_cfg).result() + self._closed = False + self._session = session + self._s3 = s3 + self._s3_ctx = s3_ctx + + def close(self): + async def _close(s3): + await s3.close() + + if not self._closed: + self._async.submit(_close, self._s3).result() + self._async.terminate() + self._closed = True + + def __del__(self): + self.close() + + def list_dir(self, url, **kw): + """Returns a future object""" + + async def action(url, s3, **kw): + return await s3_dir(url, s3=s3, **kw) + + return self._async.submit(action, url, self._s3, **kw) + + def find_all(self, url, pred=None, glob=None, **kw): + """List all objects under certain path + + Returns a future object that resolves to a list of s3 object metadata + + each s3 object is represented by a SimpleNamespace with attributes: + - url + - size + - last_modified + - etag + """ + if glob is None and isinstance(pred, str): + pred, glob = None, pred + + async def action(url, s3, **kw): + return await s3_find(url, s3=s3, pred=pred, glob=glob, **kw) + + return self._async.submit(action, url, self._s3, **kw) + + def find(self, url, pred=None, glob=None, **kw): + """List all objects under certain path + + Returns an iterator of s3 object metadata + + each s3 object is represented by a SimpleNamespace with attributes: + - url + - size + - last_modified + - etag + """ + if glob is None and isinstance(pred, str): + pred, glob = None, pred + + async def find_to_queue(url, s3, q, **kw): + async def on_file(x): + await q.put(x) + + try: + await _s3_find_via_cbk(url, on_file, s3=s3, pred=pred, glob=glob, **kw) + except Exception: # pylint:disable=broad-except + return False + finally: + await q.put(EOS_MARKER) + return True + + q = asyncio.Queue(1000) + ff = self._async.submit(find_to_queue, url, self._s3, q, **kw) + clean_exit = False + raise_error = False + + try: + yield from self._async.from_queue(q) + raise_error = not ff.result() + clean_exit = True + finally: + if not clean_exit: + ff.cancel() + if raise_error: + raise IOError(f"Failed to list: {url}") + + def dir_dir(self, url, depth, pred=None, **kw): + async def action(q, s3, **kw): + try: + await s3_dir_dir(url, depth, q, s3, pred=pred, **kw) + finally: + await q.put(EOS_MARKER) + + q = asyncio.Queue(1000) + ff = self._async.submit(action, q, self._s3, **kw) + clean_exit = False + + try: + yield from self._async.from_queue(q) + ff.result() + clean_exit = True + finally: + if not clean_exit: + ff.cancel() + + def head_object(self, url, **kw): + return self._async.submit(s3_head_object, url, s3=self._s3, **kw) + + def fetch(self, url, _range=None, **kw): + """Returns a future object""" + return self._async.submit( + _s3_fetch_object, url, s3=self._s3, range=_range, **kw + ) + + def __call__(self, urls, **kw): + """Fetch a bunch of s3 urls concurrently. + + urls -- sequence of , where range is (in:int,out:int)|None + + On output is a sequence of result objects, note that order is not + preserved, but one should get one result for every input. + + Successful results object will contain: + .url = url + .data = bytes + .last_modified -- last modified timestamp + .range = None | (in,out) + .error = None + + Failed result looks like this: + .url = url + .data = None + .last_modified = None + .range = None | (in, out) + .error = str| botocore.Exception class + + """ + + def generate_requests(urls, s3, **kw): + for url in urls: + if isinstance(url, tuple): + url, _range = url + else: + _range = None + + yield self._async.submit( + _s3_fetch_object, url, s3=s3, _range=_range, **kw + ) + + for rr, ee in future_results( + generate_requests(urls, self._s3, **kw), self._nconcurrent * 2 + ): + if ee is not None: + assert not "s3_fetch_object should not raise exceptions, but did" + else: + yield rr + + +def s3_find_glob( + glob_pattern: str, skip_check: bool = False, s3: Optional[S3Fetcher] = None, **kw +) -> Iterator[Any]: + """ + Build generator from supplied S3 URI glob pattern + + Arguments: + glob_pattern {str} -- Glob pattern to filter S3 Keys by + skip_check {bool} -- Skip validity check for S3 Key + Raises: + ve: ValueError if the glob pattern cannot be parsed + """ + if s3 is None: + s3 = S3Fetcher() + + def do_file_query(qq, pred, dirs_pred=None): + for d in s3.dir_dir(qq.base, qq.depth, pred=dirs_pred, **kw): + _, _files = s3.list_dir(d, **kw).result() + for f in _files: + if pred(f): + yield f + + def do_file_query2(qq, dirs_pred=None): + fname = qq.file + + stream = s3.dir_dir(qq.base, qq.depth, pred=dirs_pred, **kw) + + if skip_check: + yield from (SimpleNamespace(url=d + fname) for d in stream) + return + + stream = (s3.head_object(d + fname, **kw) for d in stream) + + for (f, _), _ in future_results(stream, 32): + if f is not None: + yield f + + def do_dir_query(qq, dirs_pred=None): + return ( + SimpleNamespace(url=url) + for url in s3.dir_dir(qq.base, qq.depth, pred=dirs_pred, **kw) + ) + + try: + qq = parse_query(glob_pattern) + except ValueError as ve: + logging.error("URI glob-pattern not understood: %s", ve) + raise ve + + glob_or_file = qq.glob or qq.file + + if qq.depth is None and glob_or_file is None: + stream = s3.find(qq.base, **kw) + elif qq.depth is None or qq.depth < 0: + if qq.glob: + stream = s3.find(qq.base, glob=qq.glob, **kw) + elif qq.file: + postfix = "/" + qq.file + stream = s3.find(qq.base, pred=lambda o: o.url.endswith(postfix), **kw) + else: + # fixed depth query + _, prefix = s3_url_parse(glob_pattern) + dirs_glob = prefix.split("/")[:-1] + + def dirs_pred(f): + n = f.count("/") + _glob = "/".join(dirs_glob[:n]) + "/" + return fnmatch(f, _glob) + + if qq.glob is not None: + pred = norm_predicate(glob=qq.glob) + stream = do_file_query(qq, pred, dirs_pred=dirs_pred) + elif qq.file is not None: + stream = do_file_query2(qq, dirs_pred=dirs_pred) + else: + stream = do_dir_query(qq, dirs_pred=dirs_pred) + + return stream diff --git a/libs/cloud/odc/azure/__init__.py b/libs/cloud/odc/azure/__init__.py index 6a5b2fbdd..2f7a0dd3f 100644 --- a/libs/cloud/odc/azure/__init__.py +++ b/libs/cloud/odc/azure/__init__.py @@ -1,86 +1,8 @@ """Azure blob storage crawling and YAML fetching utilities """ -from ..cloud._version import __version__ -from azure.storage.blob import ContainerClient, BlobClient -from typing import List, Tuple, Optional -from multiprocessing.dummy import Pool as ThreadPool -from functools import partial +from odc.cloud._version import __version__ -def find_blobs( - container_name: str, - credential: str, - prefix: str, - suffix: str, - account_url: Optional[str] = None, -): - if account_url is not None: - container = ContainerClient( - account_url=account_url, - container_name=container_name, - credential=credential, - ) - else: - container = ContainerClient.from_connection_string( - conn_str=credential, container_name=container_name - ) - for blob_record in container.list_blobs(name_starts_with=prefix): - blob_name = blob_record["name"] - if blob_name.endswith(suffix): - yield blob_name +from ._impl import download_blob, download_yamls, find_blobs - -def download_yamls( - account_url: str, - container_name: str, - credential: str, - yaml_urls: List[str], - workers: int = 32, -) -> List[Tuple[Optional[bytes], str, Optional[str]]]: - """Download all YAML's in a list of blob names and generate content - Arguments: - account_url {str} -- Azure account url - container_name {str} -- Azure container name - credential {str} -- Azure credential token - - yaml_urls {list} -- List of URL's to download YAML's from - workers {int} -- Number of workers to use for Thredds Downloading - Returns: - list -- tuples of contents and filenames - """ - # use a threadpool to download from Azure blobstore - - pool = ThreadPool(workers) - yamls = pool.map( - partial(download_blob, account_url, container_name, credential), yaml_urls - ) - pool.close() - pool.join() - return yamls - - -def download_blob( - account_url: Optional[str], container_name: str, credential: str, blob_name: str -) -> Tuple[Optional[bytes], str, Optional[str]]: - """Internal method to download YAML's from Azure via BlobClient - Arguments: - account_url {str} -- Azure account url - container_name {str} -- Azure container name - credential {str} -- Azure credential token - blob_name {str} -- Blob name to download - Returns: - tuple -- URL content, target file and placeholder for error - """ - if account_url is not None: - blob = BlobClient( - account_url=account_url, - container_name=container_name, - credential=credential, - blob_name=blob_name, - ) - else: - blob = BlobClient.from_connection_string( - conn_str=credential, container_name=container_name, blob_name=blob_name - ) - - return (blob.download_blob().readall(), blob.url, None) +__all__ = ["__version__", "download_blob", "download_yamls", "find_blobs"] diff --git a/libs/cloud/odc/azure/_impl.py b/libs/cloud/odc/azure/_impl.py new file mode 100644 index 000000000..3134e3415 --- /dev/null +++ b/libs/cloud/odc/azure/_impl.py @@ -0,0 +1,84 @@ +from functools import partial +from multiprocessing.dummy import Pool as ThreadPool +from typing import List, Optional, Tuple + +from azure.storage.blob import BlobClient, ContainerClient + + +def find_blobs( + container_name: str, + credential: str, + prefix: str, + suffix: str, + account_url: Optional[str] = None, +): + if account_url is not None: + container = ContainerClient( + account_url=account_url, + container_name=container_name, + credential=credential, + ) + else: + container = ContainerClient.from_connection_string( + conn_str=credential, container_name=container_name + ) + for blob_record in container.list_blobs(name_starts_with=prefix): + blob_name = blob_record["name"] + if blob_name.endswith(suffix): + yield blob_name + + +def download_yamls( + account_url: str, + container_name: str, + credential: str, + yaml_urls: List[str], + workers: int = 31, +) -> List[Tuple[Optional[bytes], str, Optional[str]]]: + """Download all YAML's in a list of blob names and generate content + Arguments: + account_url {str} -- Azure account url + container_name {str} -- Azure container name + credential {str} -- Azure credential token + + yaml_urls {list} -- List of URL's to download YAML's from + workers {int} -- Number of workers to use for Thredds Downloading + Returns: + list -- tuples of contents and filenames + """ + # use a threadpool to download from Azure blobstore + + pool = ThreadPool(workers) + yamls = pool.map( + partial(download_blob, account_url, container_name, credential), yaml_urls + ) + pool.close() + pool.join() + return yamls + + +def download_blob( + account_url: Optional[str], container_name: str, credential: str, blob_name: str +) -> Tuple[Optional[bytes], str, Optional[str]]: + """Internal method to download YAML's from Azure via BlobClient + Arguments: + account_url {str} -- Azure account url + container_name {str} -- Azure container name + credential {str} -- Azure credential token + blob_name {str} -- Blob name to download + Returns: + tuple -- URL content, target file and placeholder for error + """ + if account_url is not None: + blob = BlobClient( + account_url=account_url, + container_name=container_name, + credential=credential, + blob_name=blob_name, + ) + else: + blob = BlobClient.from_connection_string( + conn_str=credential, container_name=container_name, blob_name=blob_name + ) + + return blob.download_blob().readall(), blob.url, None From 98ed6ddc73bec9ef239daa977634ce32de005a3d Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 9 Nov 2022 00:02:05 +0000 Subject: [PATCH 2/2] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- apps/cloud/odc/apps/cloud/azure_to_tar.py | 8 +- apps/cloud/odc/apps/cloud/gs_to_tar.py | 3 +- apps/cloud/odc/apps/cloud/redrive_to_queue.py | 5 +- apps/cloud/odc/apps/cloud/s3_find.py | 2 +- apps/cloud/odc/apps/cloud/s3_inventory.py | 3 +- apps/cloud/odc/apps/cloud/s3_to_tar.py | 6 +- apps/cloud/odc/apps/cloud/thredds_to_tar.py | 5 +- apps/dc_tools/odc/apps/dc_tools/_stac.py | 1 - .../odc/apps/dc_tools/add_update_products.py | 5 +- .../odc/apps/dc_tools/cop_dem_to_dc.py | 4 +- .../odc/apps/dc_tools/esa_worldcover_to_dc.py | 4 +- apps/dc_tools/odc/apps/dc_tools/export_md.py | 10 +- apps/dc_tools/odc/apps/dc_tools/fs_to_dc.py | 18 ++-- .../odc/apps/dc_tools/index_from_tar.py | 5 +- apps/dc_tools/odc/apps/dc_tools/sqs_to_dc.py | 11 ++- .../odc/apps/dc_tools/thredds_to_dc.py | 11 +-- apps/dc_tools/odc/apps/dc_tools/utils.py | 11 +-- apps/dc_tools/tests/conftest.py | 1 + .../tests/test_add_update_products.py | 3 +- apps/dc_tools/tests/test_cop_dem_to_dc.py | 5 +- .../tests/test_esa_worldcover_to_dc.py | 4 +- apps/dc_tools/tests/test_fs_to_dc.py | 6 +- apps/dc_tools/tests/test_sqs_to_dc.py | 20 ++-- apps/dc_tools/tests/test_stac_transform.py | 2 +- libs/algo/odc/algo/__init__.py | 95 ++++++++----------- libs/algo/odc/algo/_broadcast.py | 5 +- libs/algo/odc/algo/_dask.py | 21 ++-- libs/algo/odc/algo/_dask_stream.py | 9 +- libs/algo/odc/algo/_geomedian.py | 10 +- libs/algo/odc/algo/_memsink.py | 13 +-- libs/algo/odc/algo/_numeric.py | 3 +- libs/algo/odc/algo/_numexpr.py | 10 +- libs/algo/odc/algo/_percentile.py | 10 +- libs/algo/odc/algo/_rgba.py | 8 +- libs/algo/odc/algo/_tiff.py | 24 ++--- libs/algo/odc/algo/_tools.py | 2 +- libs/algo/odc/algo/_types.py | 3 +- libs/algo/odc/algo/_warp.py | 15 +-- libs/algo/odc/algo/io.py | 4 +- libs/algo/odc/algo/pixel.py | 6 +- libs/algo/tests/test_dask.py | 6 +- libs/algo/tests/test_io.py | 2 +- libs/algo/tests/test_masking.py | 15 ++- libs/algo/tests/test_memsink.py | 4 +- libs/algo/tests/test_numeric.py | 4 +- libs/algo/tests/test_percentile.py | 4 +- libs/cloud/odc/aio/__init__.py | 1 + libs/cloud/odc/aws/_find.py | 2 +- libs/cloud/odc/aws/dns.py | 2 +- libs/cloud/odc/aws/misc.py | 8 +- libs/cloud/odc/aws/queue.py | 5 +- libs/cloud/odc/ppt/async_thread.py | 1 - libs/cloud/odc/thredds/__init__.py | 11 ++- libs/cloud/tests/test_aws.py | 5 +- libs/cloud/tests/test_azure.py | 2 +- libs/cloud/tests/test_thredds.py | 2 +- libs/io/odc/io/__init__.py | 9 +- libs/io/odc/io/cgroups.py | 1 + libs/io/odc/io/tar.py | 4 +- libs/io/odc/io/text.py | 5 +- libs/io/tests/test_text.py | 2 +- libs/ui/odc/ui/__init__.py | 36 +++---- libs/ui/odc/ui/_images.py | 11 ++- libs/ui/odc/ui/_map.py | 10 +- libs/ui/odc/ui/_ui.py | 3 +- libs/ui/odc/ui/plt_tools.py | 1 + scripts/patch_version.py | 3 +- staging/dnsup/dea_dnsup/__init__.py | 4 +- staging/geom/odc/geom/__init__.py | 13 +-- 69 files changed, 274 insertions(+), 293 deletions(-) diff --git a/apps/cloud/odc/apps/cloud/azure_to_tar.py b/apps/cloud/odc/apps/cloud/azure_to_tar.py index ba973c49f..fabf8e324 100644 --- a/apps/cloud/odc/apps/cloud/azure_to_tar.py +++ b/apps/cloud/odc/apps/cloud/azure_to_tar.py @@ -1,10 +1,10 @@ import tarfile -import click -from odc.azure import find_blobs, download_yamls - -from odc.io.tar import tar_mode, add_txt_file from urllib.parse import urlparse +import click +from odc.azure import download_yamls, find_blobs +from odc.io.tar import add_txt_file, tar_mode + @click.command("azure-to-tar") @click.argument("account_url", type=str, nargs=1) diff --git a/apps/cloud/odc/apps/cloud/gs_to_tar.py b/apps/cloud/odc/apps/cloud/gs_to_tar.py index 9da871ef8..e470fc823 100644 --- a/apps/cloud/odc/apps/cloud/gs_to_tar.py +++ b/apps/cloud/odc/apps/cloud/gs_to_tar.py @@ -1,7 +1,8 @@ import os +import shutil import tarfile + import click -import shutil from google.cloud import storage diff --git a/apps/cloud/odc/apps/cloud/redrive_to_queue.py b/apps/cloud/odc/apps/cloud/redrive_to_queue.py index f12f00b58..749a2b067 100644 --- a/apps/cloud/odc/apps/cloud/redrive_to_queue.py +++ b/apps/cloud/odc/apps/cloud/redrive_to_queue.py @@ -1,6 +1,7 @@ -import click -import sys import logging +import sys + +import click from odc.aws.queue import redrive_queue diff --git a/apps/cloud/odc/apps/cloud/s3_find.py b/apps/cloud/odc/apps/cloud/s3_find.py index 70873efe5..d6386046e 100755 --- a/apps/cloud/odc/apps/cloud/s3_find.py +++ b/apps/cloud/odc/apps/cloud/s3_find.py @@ -57,7 +57,7 @@ def cli(uri, skip_check, no_sign_request=None, request_payer=False): except ValueError as ve: click.echo(str(ve), err=True) sys.exit(1) - except Exception as e: #pylint:disable=broad-except + except Exception as e: # pylint:disable=broad-except click.echo(str(e), err=True) sys.exit(1) diff --git a/apps/cloud/odc/apps/cloud/s3_inventory.py b/apps/cloud/odc/apps/cloud/s3_inventory.py index e56cb57cc..2098faf85 100644 --- a/apps/cloud/odc/apps/cloud/s3_inventory.py +++ b/apps/cloud/odc/apps/cloud/s3_inventory.py @@ -1,6 +1,7 @@ import re -from fnmatch import fnmatch import sys +from fnmatch import fnmatch + import click from odc.aws import s3_client from odc.aws.inventory import list_inventory diff --git a/apps/cloud/odc/apps/cloud/s3_to_tar.py b/apps/cloud/odc/apps/cloud/s3_to_tar.py index 8dcc7ed32..0072ab068 100755 --- a/apps/cloud/odc/apps/cloud/s3_to_tar.py +++ b/apps/cloud/odc/apps/cloud/s3_to_tar.py @@ -1,13 +1,13 @@ -import click -import tarfile import logging import signal import sys +import tarfile from sys import stderr, stdout +import click from odc.aio import S3Fetcher from odc.io import read_stdin_lines -from odc.io.tar import tar_mode, add_txt_file +from odc.io.tar import add_txt_file, tar_mode from odc.io.timer import RateEstimator diff --git a/apps/cloud/odc/apps/cloud/thredds_to_tar.py b/apps/cloud/odc/apps/cloud/thredds_to_tar.py index fd407847e..f08e18551 100644 --- a/apps/cloud/odc/apps/cloud/thredds_to_tar.py +++ b/apps/cloud/odc/apps/cloud/thredds_to_tar.py @@ -1,7 +1,8 @@ import tarfile + import click -from odc.io.tar import tar_mode, add_txt_file -from odc.thredds import thredds_find_glob, download_yamls +from odc.io.tar import add_txt_file, tar_mode +from odc.thredds import download_yamls, thredds_find_glob @click.command("thredds-to-tar") diff --git a/apps/dc_tools/odc/apps/dc_tools/_stac.py b/apps/dc_tools/odc/apps/dc_tools/_stac.py index bc6879c26..62f84355b 100644 --- a/apps/dc_tools/odc/apps/dc_tools/_stac.py +++ b/apps/dc_tools/odc/apps/dc_tools/_stac.py @@ -7,7 +7,6 @@ from uuid import UUID import numpy - from datacube.utils.geometry import Geometry from toolz import get_in from urlpath import URL diff --git a/apps/dc_tools/odc/apps/dc_tools/add_update_products.py b/apps/dc_tools/odc/apps/dc_tools/add_update_products.py index 152f62c05..1e474c163 100644 --- a/apps/dc_tools/odc/apps/dc_tools/add_update_products.py +++ b/apps/dc_tools/odc/apps/dc_tools/add_update_products.py @@ -6,7 +6,7 @@ import sys from collections import Counter, namedtuple from csv import DictReader -from typing import Optional +from typing import Any, Dict, List, Optional import click import datacube @@ -14,11 +14,10 @@ import yaml from datacube import Datacube from odc.apps.dc_tools.utils import ( - update_if_exists, statsd_gauge_reporting, statsd_setting, + update_if_exists, ) -from typing import Any, Dict, List Product = namedtuple("Product", ["name", "doc"]) diff --git a/apps/dc_tools/odc/apps/dc_tools/cop_dem_to_dc.py b/apps/dc_tools/odc/apps/dc_tools/cop_dem_to_dc.py index 910a3590e..6f4118c5e 100644 --- a/apps/dc_tools/odc/apps/dc_tools/cop_dem_to_dc.py +++ b/apps/dc_tools/odc/apps/dc_tools/cop_dem_to_dc.py @@ -16,13 +16,13 @@ from datacube.utils import read_documents from odc.apps.dc_tools.utils import ( SkippedException, + archive_less_mature, bbox, index_update_dataset, limit, - update_if_exists, - archive_less_mature, statsd_gauge_reporting, statsd_setting, + update_if_exists, ) from rio_stac import create_stac_item diff --git a/apps/dc_tools/odc/apps/dc_tools/esa_worldcover_to_dc.py b/apps/dc_tools/odc/apps/dc_tools/esa_worldcover_to_dc.py index 9a5c63efb..0084f69a4 100644 --- a/apps/dc_tools/odc/apps/dc_tools/esa_worldcover_to_dc.py +++ b/apps/dc_tools/odc/apps/dc_tools/esa_worldcover_to_dc.py @@ -15,13 +15,13 @@ from datacube.index.hl import Doc2Dataset from datacube.utils import read_documents from odc.apps.dc_tools.utils import ( + archive_less_mature, bbox, index_update_dataset, limit, - update_if_exists, - archive_less_mature, statsd_gauge_reporting, statsd_setting, + update_if_exists, ) from rio_stac import create_stac_item diff --git a/apps/dc_tools/odc/apps/dc_tools/export_md.py b/apps/dc_tools/odc/apps/dc_tools/export_md.py index 19162b2c4..2fb852c9f 100644 --- a/apps/dc_tools/odc/apps/dc_tools/export_md.py +++ b/apps/dc_tools/odc/apps/dc_tools/export_md.py @@ -9,14 +9,14 @@ default: ['3', '4', '5'] swir: ['1', '2'] """ -from pathlib import Path -import yaml -import click import logging +from pathlib import Path -from datacube.testutils.io import native_geobox -from datacube.storage import BandInfo +import click +import yaml from datacube import Datacube +from datacube.storage import BandInfo +from datacube.testutils.io import native_geobox _LOG = logging.getLogger(__name__) diff --git a/apps/dc_tools/odc/apps/dc_tools/fs_to_dc.py b/apps/dc_tools/odc/apps/dc_tools/fs_to_dc.py index e113671cd..545e8ff6a 100755 --- a/apps/dc_tools/odc/apps/dc_tools/fs_to_dc.py +++ b/apps/dc_tools/odc/apps/dc_tools/fs_to_dc.py @@ -1,24 +1,22 @@ import json +import logging from pathlib import Path +from typing import Generator, Optional import click import datacube +import yaml from datacube.index.hl import Doc2Dataset +from odc.apps.dc_tools._stac import stac_transform from odc.apps.dc_tools.utils import ( - index_update_dataset, - update_if_exists, - archive_less_mature, allow_unsafe, - transform_stac, + archive_less_mature, + index_update_dataset, statsd_gauge_reporting, statsd_setting, + transform_stac, + update_if_exists, ) -from odc.apps.dc_tools._stac import stac_transform -from typing import Generator, Optional -import logging - - -import yaml logging.basicConfig( level=logging.WARNING, diff --git a/apps/dc_tools/odc/apps/dc_tools/index_from_tar.py b/apps/dc_tools/odc/apps/dc_tools/index_from_tar.py index 3f8d9e744..3f8879ddc 100644 --- a/apps/dc_tools/odc/apps/dc_tools/index_from_tar.py +++ b/apps/dc_tools/odc/apps/dc_tools/index_from_tar.py @@ -6,11 +6,12 @@ import click import datacube from datacube.utils.changes import allow_any -from ._docs import from_yaml_doc_stream -from ._stac import stac_transform from odc.io.tar import tar_doc_stream, tar_mode from odc.io.timer import RateEstimator +from ._docs import from_yaml_doc_stream +from ._stac import stac_transform + def from_tar_file(tarfname, index, mk_uri, mode, doc_transform=None, **kwargs): """returns a sequence of tuples where each tuple is either diff --git a/apps/dc_tools/odc/apps/dc_tools/sqs_to_dc.py b/apps/dc_tools/odc/apps/dc_tools/sqs_to_dc.py index 1516d18a1..bc2c956d5 100644 --- a/apps/dc_tools/odc/apps/dc_tools/sqs_to_dc.py +++ b/apps/dc_tools/odc/apps/dc_tools/sqs_to_dc.py @@ -9,11 +9,11 @@ from typing import Tuple import boto3 -from botocore import UNSIGNED -from botocore.config import Config import click import pandas as pd import requests +from botocore import UNSIGNED +from botocore.config import Config from datacube import Datacube from datacube.index.hl import Doc2Dataset from datacube.utils import documents @@ -22,25 +22,26 @@ SkippedException, allow_unsafe, archive, + archive_less_mature, fail_on_missing_lineage, index_update_dataset, limit, no_sign_request, skip_lineage, - statsd_setting, statsd_gauge_reporting, + statsd_setting, transform_stac, transform_stac_absolute, - archive_less_mature, update, update_if_exists, verify_lineage, ) from odc.aws.queue import get_messages -from ._stac import stac_transform, stac_transform_absolute from toolz import dicttoolz from yaml import safe_load +from ._stac import stac_transform, stac_transform_absolute + # Added log handler logging.basicConfig(level=logging.WARNING, handlers=[logging.StreamHandler()]) diff --git a/apps/dc_tools/odc/apps/dc_tools/thredds_to_dc.py b/apps/dc_tools/odc/apps/dc_tools/thredds_to_dc.py index 0b5a58f1f..345c30aa9 100644 --- a/apps/dc_tools/odc/apps/dc_tools/thredds_to_dc.py +++ b/apps/dc_tools/odc/apps/dc_tools/thredds_to_dc.py @@ -1,17 +1,16 @@ """Crawl Thredds for prefixes and fetch YAML's for indexing and dump them into a Datacube instance """ -import sys import logging -from typing import Tuple +import sys +from typing import List, Tuple import click -from odc.thredds import thredds_find_glob, download_yamls -from odc.apps.dc_tools.utils import statsd_gauge_reporting, statsd_setting -from ._docs import from_yaml_doc_stream from datacube import Datacube +from odc.apps.dc_tools.utils import statsd_gauge_reporting, statsd_setting +from odc.thredds import download_yamls, thredds_find_glob -from typing import List, Tuple +from ._docs import from_yaml_doc_stream def dump_list_to_odc( diff --git a/apps/dc_tools/odc/apps/dc_tools/utils.py b/apps/dc_tools/odc/apps/dc_tools/utils.py index 8f8e0a11c..1bf2c20f3 100644 --- a/apps/dc_tools/odc/apps/dc_tools/utils.py +++ b/apps/dc_tools/odc/apps/dc_tools/utils.py @@ -1,16 +1,13 @@ -import os import logging -import click -import pkg_resources - +import os from typing import Iterable, Optional, Union +import click +import pkg_resources from datacube import Datacube from datacube.index.hl import Doc2Dataset from datacube.utils import changes - -from datadog import statsd, initialize - +from datadog import initialize, statsd ESRI_LANDCOVER_BASE_URI = ( "https://ai4edataeuwest.blob.core.windows.net/io-lulc/" diff --git a/apps/dc_tools/tests/conftest.py b/apps/dc_tools/tests/conftest.py index 9328858e6..ac5b7e043 100644 --- a/apps/dc_tools/tests/conftest.py +++ b/apps/dc_tools/tests/conftest.py @@ -1,5 +1,6 @@ import json from pathlib import Path + import pytest import yaml from datacube import Datacube diff --git a/apps/dc_tools/tests/test_add_update_products.py b/apps/dc_tools/tests/test_add_update_products.py index 779b3b856..d0002f239 100644 --- a/apps/dc_tools/tests/test_add_update_products.py +++ b/apps/dc_tools/tests/test_add_update_products.py @@ -1,11 +1,10 @@ from pathlib import Path import pytest -from datacube import Datacube from click.testing import CliRunner +from datacube import Datacube from odc.apps.dc_tools.add_update_products import _get_product, _parse_csv, cli - TEST_DATA_FOLDER: Path = Path(__file__).parent.joinpath("data") LOCAL_EXAMPLE: str = "example_product_list.csv" PRODUCT_EXAMPLE: str = ( diff --git a/apps/dc_tools/tests/test_cop_dem_to_dc.py b/apps/dc_tools/tests/test_cop_dem_to_dc.py index a93ed3114..586713422 100644 --- a/apps/dc_tools/tests/test_cop_dem_to_dc.py +++ b/apps/dc_tools/tests/test_cop_dem_to_dc.py @@ -1,9 +1,6 @@ import pytest - from click.testing import CliRunner - -from odc.apps.dc_tools.cop_dem_to_dc import get_dem_tile_uris, cli - +from odc.apps.dc_tools.cop_dem_to_dc import cli, get_dem_tile_uris PRODUCTS = ["cop_30", "cop_90"] diff --git a/apps/dc_tools/tests/test_esa_worldcover_to_dc.py b/apps/dc_tools/tests/test_esa_worldcover_to_dc.py index b0a2cd11e..19f64be40 100644 --- a/apps/dc_tools/tests/test_esa_worldcover_to_dc.py +++ b/apps/dc_tools/tests/test_esa_worldcover_to_dc.py @@ -1,8 +1,6 @@ import pytest - from click.testing import CliRunner - -from odc.apps.dc_tools.esa_worldcover_to_dc import get_tile_uris, cli, _unpack_bbox +from odc.apps.dc_tools.esa_worldcover_to_dc import _unpack_bbox, cli, get_tile_uris @pytest.fixture diff --git a/apps/dc_tools/tests/test_fs_to_dc.py b/apps/dc_tools/tests/test_fs_to_dc.py index 41857c817..9351ff8a0 100644 --- a/apps/dc_tools/tests/test_fs_to_dc.py +++ b/apps/dc_tools/tests/test_fs_to_dc.py @@ -1,9 +1,9 @@ +from pathlib import Path + import pytest from click.testing import CliRunner -from odc.apps.dc_tools.fs_to_dc import cli, _find_files from datacube import Datacube - -from pathlib import Path +from odc.apps.dc_tools.fs_to_dc import _find_files, cli TEST_DATA_FOLDER: Path = Path(__file__).parent.joinpath("data") diff --git a/apps/dc_tools/tests/test_sqs_to_dc.py b/apps/dc_tools/tests/test_sqs_to_dc.py index afdcc4a97..616974794 100644 --- a/apps/dc_tools/tests/test_sqs_to_dc.py +++ b/apps/dc_tools/tests/test_sqs_to_dc.py @@ -2,30 +2,26 @@ Test for SQS to DC tool """ import json +import os from functools import partial +from pathlib import Path from pprint import pformat -import pytest - import boto3 -from moto import mock_sqs - -from pathlib import Path - +import pytest +from datacube import Datacube +from datacube.index.hl import Doc2Dataset from datacube.utils import documents from deepdiff import DeepDiff +from moto import mock_sqs from odc.apps.dc_tools._stac import stac_transform from odc.apps.dc_tools.sqs_to_dc import ( - handle_json_message, - handle_bucket_notification_message, extract_metadata_from_message, + handle_bucket_notification_message, + handle_json_message, ) from odc.apps.dc_tools.utils import index_update_dataset - -from datacube import Datacube -from datacube.index.hl import Doc2Dataset from odc.aws.queue import get_messages -import os record_message = { "Records": [ diff --git a/apps/dc_tools/tests/test_stac_transform.py b/apps/dc_tools/tests/test_stac_transform.py index 5ba84ef48..4d0ca6763 100644 --- a/apps/dc_tools/tests/test_stac_transform.py +++ b/apps/dc_tools/tests/test_stac_transform.py @@ -2,8 +2,8 @@ Test for stac_transform """ -from odc.apps.dc_tools._stac import stac_transform from datacube.utils.changes import get_doc_changes +from odc.apps.dc_tools._stac import stac_transform def test_landsat_stac_transform(landsat_stac, landsat_odc): diff --git a/libs/algo/odc/algo/__init__.py b/libs/algo/odc/algo/__init__.py index 0ce9f5be6..14e35975c 100644 --- a/libs/algo/odc/algo/__init__.py +++ b/libs/algo/odc/algo/__init__.py @@ -2,78 +2,59 @@ """ -from ._version import __version__ -from ._numexpr import apply_numexpr, safe_div - +from ._broadcast import pool_broadcast +from ._dask import ( + chunked_persist, + chunked_persist_da, + chunked_persist_ds, + randomize, + reshape_yxbt, + wait_for_future, +) +from ._dask_stream import dask_compute_stream, seq_to_bags +from ._geomedian import ( + geomedian_with_mads, + int_geomedian, + int_geomedian_np, + reshape_for_geomedian, + xr_geomedian, +) from ._masking import ( - keep_good_np, - keep_good_only, + binary_closing, + binary_dilation, + binary_erosion, + binary_opening, + choose_first_valid, + enum_to_bool, erase_bad, + fmask_to_bool, from_float, from_float_np, + gap_fill, + keep_good_np, + keep_good_only, + mask_cleanup, + mask_cleanup_np, to_f32, to_f32_np, to_float, to_float_np, - fmask_to_bool, - enum_to_bool, - gap_fill, - choose_first_valid, - mask_cleanup, - mask_cleanup_np, - binary_opening, - binary_closing, - binary_dilation, - binary_erosion, ) - -from ._geomedian import ( - xr_geomedian, - reshape_for_geomedian, - geomedian_with_mads, - int_geomedian, - int_geomedian_np, -) - -from ._dask import ( - chunked_persist, - chunked_persist_ds, - chunked_persist_da, - randomize, - reshape_yxbt, - wait_for_future, -) - from ._memsink import ( - store_to_mem, - yxbt_sink_to_mem, + da_mem_sink, da_yxbt_sink, + da_yxt_sink, + store_to_mem, yxbt_sink, - da_mem_sink, + yxbt_sink_to_mem, yxt_sink, - da_yxt_sink, -) - -from ._rgba import ( - is_rgb, - to_rgba, - to_rgba_np, - colorize, ) - -from ._warp import xr_reproject - -from ._tiff import save_cog - +from ._numexpr import apply_numexpr, safe_div from ._percentile import xr_quantile -from ._broadcast import ( - pool_broadcast, -) - -from ._dask_stream import ( - dask_compute_stream, - seq_to_bags, -) +from ._rgba import colorize, is_rgb, to_rgba, to_rgba_np +from ._tiff import save_cog +from ._version import __version__ +from ._warp import xr_reproject __all__ = ( "apply_numexpr", diff --git a/libs/algo/odc/algo/_broadcast.py b/libs/algo/odc/algo/_broadcast.py index 94af472f6..3097e1e34 100644 --- a/libs/algo/odc/algo/_broadcast.py +++ b/libs/algo/odc/algo/_broadcast.py @@ -2,9 +2,10 @@ - pool_broadcast """ -from typing import List, Dict, Any from random import randint -from dask.distributed import Queue, Client +from typing import Any, Dict, List + +from dask.distributed import Client, Queue def _bcast_action( diff --git a/libs/algo/odc/algo/_dask.py b/libs/algo/odc/algo/_dask.py index bbdbda8b5..9ce5399fe 100644 --- a/libs/algo/odc/algo/_dask.py +++ b/libs/algo/odc/algo/_dask.py @@ -2,20 +2,23 @@ Generic dask helpers """ -from typing import Tuple, Union, cast, Iterator, Optional, List, Any, Dict, Hashable -from random import randint +import functools +from bisect import bisect_left, bisect_right from datetime import datetime -from bisect import bisect_right, bisect_left -import numpy as np -import xarray as xr +from random import randint +from typing import Any, Dict, Hashable, Iterator, List, Optional, Tuple, Union, cast + import dask -from dask.distributed import wait as dask_wait, TimeoutError import dask.array as da -from dask.highlevelgraph import HighLevelGraph -from dask import is_dask_collection -import functools +import numpy as np import toolz +import xarray as xr +from dask import is_dask_collection +from dask.distributed import TimeoutError +from dask.distributed import wait as dask_wait +from dask.highlevelgraph import HighLevelGraph from toolz import partition_all + from ._tools import ROI, roi_shape, slice_in_out diff --git a/libs/algo/odc/algo/_dask_stream.py b/libs/algo/odc/algo/_dask_stream.py index 2febd1a9a..ca8026ab3 100644 --- a/libs/algo/odc/algo/_dask_stream.py +++ b/libs/algo/odc/algo/_dask_stream.py @@ -2,13 +2,14 @@ - dask_compute_stream """ -from typing import Any, Iterable +import queue +import threading from random import randint +from typing import Any, Iterable + +import dask.bag import toolz -import queue from dask.distributed import Client -import dask.bag -import threading def _randomize(prefix): diff --git a/libs/algo/odc/algo/_geomedian.py b/libs/algo/odc/algo/_geomedian.py index d1a36f52d..53e450fe6 100644 --- a/libs/algo/odc/algo/_geomedian.py +++ b/libs/algo/odc/algo/_geomedian.py @@ -1,13 +1,15 @@ """ Helper methods for Geometric Median computation. """ +import functools from typing import Optional, Tuple, Union -import numpy as np -import xarray as xr + import dask import dask.array as da -import functools +import numpy as np +import xarray as xr + from ._dask import randomize, reshape_yxbt -from ._masking import to_float_np, from_float_np +from ._masking import from_float_np, to_float_np from ._memsink import yxbt_sink diff --git a/libs/algo/odc/algo/_memsink.py b/libs/algo/odc/algo/_memsink.py index 722c2d0b1..881079609 100644 --- a/libs/algo/odc/algo/_memsink.py +++ b/libs/algo/odc/algo/_memsink.py @@ -1,15 +1,16 @@ -from typing import Any, Dict, Optional, Tuple, Union, Hashable -import numpy as np -import xarray as xr +import uuid +from typing import Any, Dict, Hashable, Optional, Tuple, Union + import dask import dask.array as da -from dask.delayed import Delayed +import numpy as np +import xarray as xr from dask.base import tokenize +from dask.delayed import Delayed from dask.highlevelgraph import HighLevelGraph from distributed import Client -import uuid -from ._dask import unpack_chunks, _roi_from_chunks +from ._dask import _roi_from_chunks, unpack_chunks ShapeLike = Union[int, Tuple[int, ...]] DtypeLike = Union[str, np.dtype] diff --git a/libs/algo/odc/algo/_numeric.py b/libs/algo/odc/algo/_numeric.py index c6fa1911f..cdc9a28e6 100644 --- a/libs/algo/odc/algo/_numeric.py +++ b/libs/algo/odc/algo/_numeric.py @@ -2,9 +2,10 @@ Misc numeric tooling """ from typing import Optional, Tuple + import numpy as np -from ._types import NumpyIndex1, NumpyIndex2, NumpyIndex +from ._types import NumpyIndex, NumpyIndex1, NumpyIndex2 def half_up(n: int) -> int: diff --git a/libs/algo/odc/algo/_numexpr.py b/libs/algo/odc/algo/_numexpr.py index 60868f236..4169f9e3f 100644 --- a/libs/algo/odc/algo/_numexpr.py +++ b/libs/algo/odc/algo/_numexpr.py @@ -1,11 +1,13 @@ -from typing import Dict, Optional, Any +import functools +from typing import Any, Dict, Optional + import dask import dask.array as da +import numexpr as ne import numpy as np import xarray as xr -import numexpr as ne -import functools -from ._dask import randomize, flatten_kv, unflatten_kv + +from ._dask import flatten_kv, randomize, unflatten_kv def apply_numexpr_np( diff --git a/libs/algo/odc/algo/_percentile.py b/libs/algo/odc/algo/_percentile.py index eafc9fef3..5a6157543 100644 --- a/libs/algo/odc/algo/_percentile.py +++ b/libs/algo/odc/algo/_percentile.py @@ -1,11 +1,13 @@ +from functools import partial from typing import List, Sequence + +import dask import dask.array as da -import xarray as xr import numpy as np -from ._masking import keep_good_np +import xarray as xr from dask.base import tokenize -import dask -from functools import partial + +from ._masking import keep_good_np def np_percentile(xx, percentile, nodata): diff --git a/libs/algo/odc/algo/_rgba.py b/libs/algo/odc/algo/_rgba.py index 95a43ddef..fced8573c 100644 --- a/libs/algo/odc/algo/_rgba.py +++ b/libs/algo/odc/algo/_rgba.py @@ -1,10 +1,12 @@ """ Helpers for dealing with RGB(A) images. """ -import numpy as np +from typing import List, Optional, Tuple, Union + +import dask import dask.array as da +import numpy as np import xarray as xr -import dask -from typing import Tuple, Optional, List, Union + from ._dask import randomize diff --git a/libs/algo/odc/algo/_tiff.py b/libs/algo/odc/algo/_tiff.py index 536a6334d..3e59bd327 100644 --- a/libs/algo/odc/algo/_tiff.py +++ b/libs/algo/odc/algo/_tiff.py @@ -1,26 +1,26 @@ import threading import warnings -from dask.delayed import Delayed +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Dict, Optional, Tuple, Union +from uuid import uuid4 + import dask import dask.array as da -from dask.base import tokenize -from pathlib import Path -from dataclasses import dataclass -from typing import Optional, Union, Tuple, Any, Dict -import xarray as xr import numpy as np -from affine import Affine import rasterio -from uuid import uuid4 -from rasterio.windows import Window +import xarray as xr +from affine import Affine +from dask.base import tokenize +from dask.delayed import Delayed from rasterio import MemoryFile from rasterio.shutil import copy as rio_copy +from rasterio.windows import Window +from ._numeric import half_up, np_slice_to_idx, roi_shrink2, roundup16 from ._types import NodataType, NumpyIndex -from ._numeric import roundup16, half_up, roi_shrink2, np_slice_to_idx from ._warp import _shrink2 - _UNSET = ":unset:-427d8b3f1944" @@ -500,7 +500,7 @@ def save_cog( if dst == ":mem:": extract = True elif dst.startswith("s3:"): - from odc.aws import mk_boto_session, get_creds_with_retry + from odc.aws import get_creds_with_retry, mk_boto_session if creds is None: _creds = get_creds_with_retry(mk_boto_session()) diff --git a/libs/algo/odc/algo/_tools.py b/libs/algo/odc/algo/_tools.py index dcc0fece2..b7504101f 100644 --- a/libs/algo/odc/algo/_tools.py +++ b/libs/algo/odc/algo/_tools.py @@ -1,7 +1,7 @@ """ Various utilities """ -from typing import Tuple, Union, Optional +from typing import Optional, Tuple, Union ROI = Union[slice, Tuple[slice, ...]] diff --git a/libs/algo/odc/algo/_types.py b/libs/algo/odc/algo/_types.py index 464a949a4..0eae4f22d 100644 --- a/libs/algo/odc/algo/_types.py +++ b/libs/algo/odc/algo/_types.py @@ -1,5 +1,6 @@ +from typing import Optional, Tuple, Union + import numpy as np -from typing import Union, Optional, Tuple NumpyIndex1 = Union[int, slice] NumpyIndex2 = Tuple[NumpyIndex1, NumpyIndex1] diff --git a/libs/algo/odc/algo/_warp.py b/libs/algo/odc/algo/_warp.py index 6abbcdb55..e5fe18435 100644 --- a/libs/algo/odc/algo/_warp.py +++ b/libs/algo/odc/algo/_warp.py @@ -1,26 +1,27 @@ """ Dask aware reproject implementation """ -from typing import Tuple, Optional, Union, Dict, Any +from typing import Any, Dict, Optional, Tuple, Union + +import dask.array as da +import dask.utils as du import numpy as np import xarray as xr from affine import Affine from dask import is_dask_collection -import dask.utils as du -import dask.array as da from dask.highlevelgraph import HighLevelGraph -from ._dask import randomize, crop_2d_dense, unpack_chunks, empty_maker +from datacube.utils import spatial_dims from datacube.utils.geometry import ( GeoBox, - rio_reproject, compute_reproject_roi, + rio_reproject, warp_affine, ) from datacube.utils.geometry.gbox import GeoboxTiles -from datacube.utils import spatial_dims -from ._types import NodataType +from ._dask import crop_2d_dense, empty_maker, randomize, unpack_chunks from ._numeric import shape_shrink2 +from ._types import NodataType def _reproject_block_impl( diff --git a/libs/algo/odc/algo/io.py b/libs/algo/odc/algo/io.py index e521571ad..ac0303889 100644 --- a/libs/algo/odc/algo/io.py +++ b/libs/algo/odc/algo/io.py @@ -1,5 +1,6 @@ """Native load and masking.""" +import json from typing import ( Callable, Dict, @@ -12,13 +13,12 @@ cast, ) -import json import xarray as xr -from pyproj import aoi, transformer from datacube import Datacube from datacube.model import Dataset from datacube.testutils.io import native_geobox from datacube.utils.geometry import GeoBox, gbox +from pyproj import aoi, transformer from ._grouper import group_by_nothing, solar_offset from ._masking import _max_fuser, _nodata_fuser, _or_fuser, enum_to_bool, mask_cleanup diff --git a/libs/algo/odc/algo/pixel.py b/libs/algo/odc/algo/pixel.py index 3da7afe52..8d0eff55f 100644 --- a/libs/algo/odc/algo/pixel.py +++ b/libs/algo/odc/algo/pixel.py @@ -1,11 +1,11 @@ """Helper methods for accessing single pixel from a rasterio file object. """ +from typing import Iterable, List, Optional, Tuple, Union + import rasterio -import rasterio.warp import rasterio.crs - -from typing import Union, Iterable, Optional, List, Tuple +import rasterio.warp RowCol = Tuple[int, int] XY = Tuple[float, float] diff --git a/libs/algo/tests/test_dask.py b/libs/algo/tests/test_dask.py index 38b861328..690bb2bbc 100644 --- a/libs/algo/tests/test_dask.py +++ b/libs/algo/tests/test_dask.py @@ -1,9 +1,9 @@ -import pytest -import numpy as np import dask.array as da +import numpy as np +import pytest +import toolz from dask import delayed from dask.distributed import Client -import toolz from odc.algo._dask import ( _rechunk_2x2, _stack_2d_np, diff --git a/libs/algo/tests/test_io.py b/libs/algo/tests/test_io.py index 15dd2698f..4a328d7f1 100644 --- a/libs/algo/tests/test_io.py +++ b/libs/algo/tests/test_io.py @@ -1,5 +1,5 @@ -from odc.algo.io import choose_transform_path import pytest +from odc.algo.io import choose_transform_path @pytest.mark.parametrize("transform_code", [None, "EPSG:9688", "EPSG:1150"]) diff --git a/libs/algo/tests/test_masking.py b/libs/algo/tests/test_masking.py index f9642bddf..72e7743dd 100644 --- a/libs/algo/tests/test_masking.py +++ b/libs/algo/tests/test_masking.py @@ -1,17 +1,16 @@ -import numpy as np -import xarray as xr import dask import dask.array as da +import numpy as np import pytest - +import xarray as xr from odc.algo._masking import ( - _gap_fill_np, - gap_fill, - fmask_to_bool, - enum_to_bool, - _get_enum_values, _enum_to_mask_numexpr, _fuse_mean_np, + _gap_fill_np, + _get_enum_values, + enum_to_bool, + fmask_to_bool, + gap_fill, mask_cleanup_np, ) diff --git a/libs/algo/tests/test_memsink.py b/libs/algo/tests/test_memsink.py index 294688a39..f4085d52a 100644 --- a/libs/algo/tests/test_memsink.py +++ b/libs/algo/tests/test_memsink.py @@ -1,15 +1,15 @@ -import xarray as xr import dask import dask.array as da import numpy as np +import xarray as xr from odc.algo._memsink import ( Cache, CachedArray, + Token, _da_from_mem, da_mem_sink, da_yxbt_sink, da_yxt_sink, - Token, yxt_sink, ) diff --git a/libs/algo/tests/test_numeric.py b/libs/algo/tests/test_numeric.py index 5ee1c655e..5e9069f85 100644 --- a/libs/algo/tests/test_numeric.py +++ b/libs/algo/tests/test_numeric.py @@ -1,11 +1,11 @@ -import pytest import numpy as np +import pytest from odc.algo._numeric import ( half_up, np_slice_to_idx, + roi_shrink2, roundup16, shape_shrink2, - roi_shrink2, ) diff --git a/libs/algo/tests/test_percentile.py b/libs/algo/tests/test_percentile.py index 085c8ffad..322c82cb8 100644 --- a/libs/algo/tests/test_percentile.py +++ b/libs/algo/tests/test_percentile.py @@ -1,8 +1,8 @@ -from odc.algo._percentile import np_percentile, xr_quantile_bands, xr_quantile +import dask.array as da import numpy as np import pytest -import dask.array as da import xarray as xr +from odc.algo._percentile import np_percentile, xr_quantile, xr_quantile_bands def test_np_percentile(): diff --git a/libs/cloud/odc/aio/__init__.py b/libs/cloud/odc/aio/__init__.py index 990099024..d21a50dc9 100644 --- a/libs/cloud/odc/aio/__init__.py +++ b/libs/cloud/odc/aio/__init__.py @@ -1,4 +1,5 @@ from odc.cloud._version import __version__ + from ._impl import ( S3Fetcher, auto_find_region, diff --git a/libs/cloud/odc/aws/_find.py b/libs/cloud/odc/aws/_find.py index 04c06bf63..ee46eba6b 100644 --- a/libs/cloud/odc/aws/_find.py +++ b/libs/cloud/odc/aws/_find.py @@ -1,6 +1,6 @@ -from types import SimpleNamespace from fnmatch import fnmatch from itertools import takewhile +from types import SimpleNamespace def s3_file_info(f, bucket): diff --git a/libs/cloud/odc/aws/dns.py b/libs/cloud/odc/aws/dns.py index 55577f971..c1e8df206 100644 --- a/libs/cloud/odc/aws/dns.py +++ b/libs/cloud/odc/aws/dns.py @@ -1,6 +1,6 @@ """ Tools for interacting with route53 """ -from . import mk_boto_session, _fetch_text, ec2_tags +from . import _fetch_text, ec2_tags, mk_boto_session def public_ip(): diff --git a/libs/cloud/odc/aws/misc.py b/libs/cloud/odc/aws/misc.py index f92c2582c..fdf22470e 100644 --- a/libs/cloud/odc/aws/misc.py +++ b/libs/cloud/odc/aws/misc.py @@ -1,11 +1,11 @@ -from botocore.session import get_session +import logging +from urllib.request import Request + from botocore.auth import S3SigV4Auth from botocore.awsrequest import AWSRequest -from urllib.request import Request +from botocore.session import get_session from . import auto_find_region, s3_url_parse -import logging - log = logging.getLogger(__name__) diff --git a/libs/cloud/odc/aws/queue.py b/libs/cloud/odc/aws/queue.py index 0c5ed6c93..0f72501bb 100644 --- a/libs/cloud/odc/aws/queue.py +++ b/libs/cloud/odc/aws/queue.py @@ -1,6 +1,7 @@ -import boto3 import itertools -from typing import Mapping, Any, Iterable, Optional +from typing import Any, Iterable, Mapping, Optional + +import boto3 def redrive_queue( diff --git a/libs/cloud/odc/ppt/async_thread.py b/libs/cloud/odc/ppt/async_thread.py index e56069785..6a257183e 100644 --- a/libs/cloud/odc/ppt/async_thread.py +++ b/libs/cloud/odc/ppt/async_thread.py @@ -6,7 +6,6 @@ from . import EOS_MARKER - log = logging.getLogger(__name__) diff --git a/libs/cloud/odc/thredds/__init__.py b/libs/cloud/odc/thredds/__init__.py index 0e10a318c..3c7bc76c0 100644 --- a/libs/cloud/odc/thredds/__init__.py +++ b/libs/cloud/odc/thredds/__init__.py @@ -1,12 +1,13 @@ """Thredds crawling and YAML fetching utilities """ -from ..cloud._version import __version__ -from thredds_crawler.crawl import Crawl -import requests -from urllib.parse import urlparse from multiprocessing.dummy import Pool as ThreadPool +from typing import List, Optional, Tuple +from urllib.parse import urlparse -from typing import List, Tuple, Optional +import requests +from thredds_crawler.crawl import Crawl + +from ..cloud._version import __version__ def thredds_find_glob( diff --git a/libs/cloud/tests/test_aws.py b/libs/cloud/tests/test_aws.py index f154de691..8a370935c 100644 --- a/libs/cloud/tests/test_aws.py +++ b/libs/cloud/tests/test_aws.py @@ -4,12 +4,11 @@ import boto3 import pytest -from moto import mock_sqs from click.testing import CliRunner - +from moto import mock_sqs from odc.apps.cloud import redrive_to_queue -from odc.aws.queue import redrive_queue, get_queues, get_queue from odc.aws._find import parse_query +from odc.aws.queue import get_queue, get_queues, redrive_queue ALIVE_QUEUE_NAME = "mock-alive-queue" DEAD_QUEUE_NAME = "mock-dead-queue" diff --git a/libs/cloud/tests/test_azure.py b/libs/cloud/tests/test_azure.py index 46a5613b7..102d03275 100644 --- a/libs/cloud/tests/test_azure.py +++ b/libs/cloud/tests/test_azure.py @@ -1,6 +1,6 @@ """Test thredds downloader code """ -from odc.azure import find_blobs, download_yamls +from odc.azure import download_yamls, find_blobs def test_find_blobs(): diff --git a/libs/cloud/tests/test_thredds.py b/libs/cloud/tests/test_thredds.py index b6afaec8f..cc9d04c86 100644 --- a/libs/cloud/tests/test_thredds.py +++ b/libs/cloud/tests/test_thredds.py @@ -1,7 +1,7 @@ """Test thredds downloader code """ import pytest -from odc.thredds import thredds_find_glob, download_yamls +from odc.thredds import download_yamls, thredds_find_glob # It's too slow to fail, disabling this for now diff --git a/libs/io/odc/io/__init__.py b/libs/io/odc/io/__init__.py index 416f2b91f..437e97306 100644 --- a/libs/io/odc/io/__init__.py +++ b/libs/io/odc/io/__init__.py @@ -1,17 +1,10 @@ """ Various file io helpers """ from ._version import __version__ -from .text import ( - parse_yaml, - read_stdin_lines, - parse_mtl, - slurp_lines, - slurp, -) from .tar import tar_doc_stream +from .text import parse_mtl, parse_yaml, read_stdin_lines, slurp, slurp_lines from .timer import RateEstimator - __all__ = ( "parse_yaml", "read_stdin_lines", diff --git a/libs/io/odc/io/cgroups.py b/libs/io/odc/io/cgroups.py index 0ff277f9e..94b0d32c8 100644 --- a/libs/io/odc/io/cgroups.py +++ b/libs/io/odc/io/cgroups.py @@ -2,6 +2,7 @@ Query Linux cgroup fs for various info """ from typing import Optional + from .text import read_int diff --git a/libs/io/odc/io/tar.py b/libs/io/odc/io/tar.py index a840ed8c1..7f00a4d79 100644 --- a/libs/io/odc/io/tar.py +++ b/libs/io/odc/io/tar.py @@ -1,9 +1,9 @@ -import tarfile import datetime import io +import itertools +import tarfile import time from pathlib import Path -import itertools def tar_mode(gzip=None, xz=None, is_pipe=None): diff --git a/libs/io/odc/io/text.py b/libs/io/odc/io/text.py index aea9fc013..fde7d9788 100644 --- a/libs/io/odc/io/text.py +++ b/libs/io/odc/io/text.py @@ -1,7 +1,6 @@ -from sys import stdin - -from typing import Tuple, Union, Dict, Any, Iterator, List, Optional from pathlib import Path +from sys import stdin +from typing import Any, Dict, Iterator, List, Optional, Tuple, Union PathLike = Union[str, Path] RawDoc = Union[str, bytes] diff --git a/libs/io/tests/test_text.py b/libs/io/tests/test_text.py index 633ed8d80..80d165067 100644 --- a/libs/io/tests/test_text.py +++ b/libs/io/tests/test_text.py @@ -1,5 +1,5 @@ import pytest -from odc.io.text import parse_mtl, parse_yaml, split_and_check, parse_slice +from odc.io.text import parse_mtl, parse_slice, parse_yaml, split_and_check def test_mtl(): diff --git a/libs/ui/odc/ui/__init__.py b/libs/ui/odc/ui/__init__.py index f4604ccea..923c9c604 100644 --- a/libs/ui/odc/ui/__init__.py +++ b/libs/ui/odc/ui/__init__.py @@ -1,36 +1,26 @@ """ Notebook display helper methods. """ -from ._version import __version__ -from ._ui import ( - ui_poll, - with_ui_cbk, - simple_progress_cbk, +from ._dc_explore import DcViewer +from ._images import ( + image_aspect, + image_shape, + mk_data_uri, + mk_image_overlay, + to_jpeg_data, + to_png_data, + to_rgba, ) - from ._map import ( dss_to_geojson, gridspec_to_geojson, - zoom_from_bbox, - show_datasets, mk_map_region_selector, select_on_a_map, + show_datasets, + zoom_from_bbox, ) - -from ._images import ( - to_rgba, - image_shape, - image_aspect, - mk_data_uri, - to_png_data, - to_jpeg_data, - mk_image_overlay, -) - -from ._dc_explore import ( - DcViewer, -) - +from ._ui import simple_progress_cbk, ui_poll, with_ui_cbk +from ._version import __version__ __all__ = ( "ui_poll", diff --git a/libs/ui/odc/ui/_images.py b/libs/ui/odc/ui/_images.py index 5694368ca..622e8dcdc 100644 --- a/libs/ui/odc/ui/_images.py +++ b/libs/ui/odc/ui/_images.py @@ -1,9 +1,10 @@ """ Notebook display helper methods. """ +from typing import Optional, Tuple, Union + import numpy as np import xarray as xr -from typing import Tuple, Optional, Union -from odc.algo import to_rgba, is_rgb +from odc.algo import is_rgb, to_rgba def image_shape(d): @@ -54,6 +55,7 @@ def mk_data_uri(data: bytes, mimetype: str = "image/png") -> str: def _to_png_data2(xx: np.ndarray, mode: str = "auto") -> memoryview: from io import BytesIO + import png if mode in ("auto", None): @@ -76,9 +78,10 @@ def _to_png_data2(xx: np.ndarray, mode: str = "auto") -> memoryview: def _compress_image(im: np.ndarray, driver="PNG", **opts) -> bytes: - import rasterio import warnings + import rasterio + if im.dtype != np.uint8: raise ValueError("Only support uint8 images on input") @@ -113,8 +116,8 @@ def to_jpeg_data(im: np.ndarray, quality=95, transparent=None) -> bytes: def xr_bounds(x, crs=None) -> Tuple[Tuple[float, float], Tuple[float, float]]: - from datacube.utils.geometry import box from datacube.testutils.geom import epsg4326 + from datacube.utils.geometry import box def get_range(a: np.ndarray) -> Tuple[float, float]: b = (a[1] - a[0]) * 0.5 diff --git a/libs/ui/odc/ui/_map.py b/libs/ui/odc/ui/_map.py index e26af42d5..f8dc62007 100644 --- a/libs/ui/odc/ui/_map.py +++ b/libs/ui/odc/ui/_map.py @@ -91,7 +91,7 @@ def show_datasets( return GeoJSON(polygons) if mode == "leaflet": - from ipyleaflet import Map, GeoJSON, FullScreenControl, LayersControl + from ipyleaflet import FullScreenControl, GeoJSON, LayersControl, Map if dst is None: center = kw.pop("center", None) @@ -127,10 +127,11 @@ def show_datasets( def mk_map_region_selector(map=None, height="600px", **kwargs): - from ipyleaflet import Map, WidgetControl, FullScreenControl, DrawControl - from ipywidgets.widgets import Layout, Button, HTML from types import SimpleNamespace + from ipyleaflet import DrawControl, FullScreenControl, Map, WidgetControl + from ipywidgets.widgets import HTML, Button, Layout + state = SimpleNamespace(selection=None, bounds=None, done=False) btn_done = Button(description="done", layout=Layout(width="5em")) @@ -229,6 +230,7 @@ def select_on_a_map(map=None, **kwargs): ... """ from IPython.display import display + from ._ui import ui_poll m, state = mk_map_region_selector(map=map, **kwargs) @@ -236,8 +238,8 @@ def select_on_a_map(map=None, **kwargs): display(m) def extract_geometry(state): - from datacube.utils.geometry import Geometry from datacube.testutils.geom import epsg4326 + from datacube.utils.geometry import Geometry return Geometry(state.selection, epsg4326) diff --git a/libs/ui/odc/ui/_ui.py b/libs/ui/odc/ui/_ui.py index 9b47bbdb2..38f6717ef 100644 --- a/libs/ui/odc/ui/_ui.py +++ b/libs/ui/odc/ui/_ui.py @@ -7,9 +7,10 @@ def mk_cbk_ui(width="100%"): :param width: Width of the UI, for example: '80%' '200px' '30em' """ - from ipywidgets import VBox, HBox, Label, Layout, IntProgress from timeit import default_timer as t_now + from ipywidgets import HBox, IntProgress, Label, Layout, VBox + pbar = IntProgress(min=0, max=100, value=0, layout=Layout(width="100%")) lbl_right = Label("") lbl_left = Label("") diff --git a/libs/ui/odc/ui/plt_tools.py b/libs/ui/odc/ui/plt_tools.py index 82b6b7074..4da897d8d 100644 --- a/libs/ui/odc/ui/plt_tools.py +++ b/libs/ui/odc/ui/plt_tools.py @@ -2,6 +2,7 @@ Various data visualisation helper methods """ from matplotlib import pyplot as plt + from ._cmaps import scl_colormap __all__ = ( diff --git a/scripts/patch_version.py b/scripts/patch_version.py index bc4d4423e..d83b67c73 100644 --- a/scripts/patch_version.py +++ b/scripts/patch_version.py @@ -1,7 +1,8 @@ import re -from packaging import version import sys +from packaging import version + version_rgx = re.compile("^\s*__version__\s*=\s*['\"]([^'\"]*)['\"]") diff --git a/staging/dnsup/dea_dnsup/__init__.py b/staging/dnsup/dea_dnsup/__init__.py index f760e728f..17816556e 100644 --- a/staging/dnsup/dea_dnsup/__init__.py +++ b/staging/dnsup/dea_dnsup/__init__.py @@ -1,7 +1,9 @@ -from ._version import __version__ import sys + from odc.aws.dns import cli as dns_cli +from ._version import __version__ + def cli(): sys.exit(dns_cli(sys.argv[1:])) diff --git a/staging/geom/odc/geom/__init__.py b/staging/geom/odc/geom/__init__.py index 4a52b0eba..e6b6de2ae 100644 --- a/staging/geom/odc/geom/__init__.py +++ b/staging/geom/odc/geom/__init__.py @@ -1,20 +1,21 @@ """ Various Geometry Helpers """ -from ._version import __version__ +import math +from typing import Optional, Tuple, Union -from typing import Union, Tuple, Optional from affine import Affine -import math from datacube.utils.geometry import ( - decompose_rws, - split_translation, - GeoBox, CRS, + GeoBox, SomeCRS, + decompose_rws, + split_translation, ) from datacube.utils.geometry._base import _norm_crs_or_error +from ._version import __version__ + F4 = Tuple[float, float, float, float] F6 = Tuple[float, float, float, float, float, float]