From f316ea460feaa226e147ed6163eccc8add7c2279 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Wed, 12 Apr 2023 11:53:30 -0700 Subject: [PATCH 1/4] Faster data transfer in weather-mv rg. Using gsutil, if not gcloud alpha storage, to copy data during regridding. Further, this PR upgrades `debug` level logs to the `info` level. --- weather_mv/loader_pipeline/regrid.py | 12 +++++------- weather_mv/loader_pipeline/sinks.py | 25 +++++++++++++++++++------ 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/weather_mv/loader_pipeline/regrid.py b/weather_mv/loader_pipeline/regrid.py index 91406bfd..a5a75dfc 100644 --- a/weather_mv/loader_pipeline/regrid.py +++ b/weather_mv/loader_pipeline/regrid.py @@ -30,7 +30,7 @@ from apache_beam.io.filesystems import FileSystems from apache_beam.io.gcp.gcsio import WRITE_CHUNK_SIZE -from .sinks import ToDataSink, open_local +from .sinks import ToDataSink, open_local, copy logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) @@ -240,16 +240,15 @@ def apply(self, uri: str): return with _metview_op(): - logger.debug(f'Copying grib from {uri!r} to local disk.') + logger.info(f'Copying grib from {uri!r} to local disk.') with open_local(uri) as local_grib: - # TODO(alxr): Figure out way to open fieldset in memory... - logger.debug(f'Regridding {uri!r}.') + logger.info(f'Regridding {uri!r}.') fs = mv.bindings.Fieldset(path=local_grib) fieldset = mv.regrid(data=fs, **self.regrid_kwargs) with tempfile.NamedTemporaryFile() as src: - logger.debug(f'Writing {self.target_from(uri)!r} to local disk.') + logger.info(f'Writing {self.target_from(uri)!r} to local disk.') if self.to_netcdf: fieldset.to_dataset().to_netcdf(src.name) else: @@ -260,8 +259,7 @@ def apply(self, uri: str): _clear_metview() logger.info(f'Uploading {self.target_from(uri)!r}.') - with FileSystems().create(self.target_from(uri)) as dst: - shutil.copyfileobj(src, dst, WRITE_CHUNK_SIZE) + copy(src.name, self.target_from(uri)) def expand(self, paths): if not self.zarr: diff --git a/weather_mv/loader_pipeline/sinks.py b/weather_mv/loader_pipeline/sinks.py index ee153728..16bf9d9a 100644 --- a/weather_mv/loader_pipeline/sinks.py +++ b/weather_mv/loader_pipeline/sinks.py @@ -22,6 +22,7 @@ import os import re import shutil +import subprocess import tempfile import typing as t @@ -338,15 +339,27 @@ def __open_dataset_file(filename: str, False) +def copy(src: str, dst: str) -> None: + """Copy data via `gcloud alpha storage` or `gsutil`.""" + errors = [] + for cmd in ['gcloud alpha storage cp', 'gsutil cp']: + try: + subprocess.run(cmd.split() + [src, dst], check=True, capture_output=True) + return + except subprocess.CalledProcessError as e: + errors.append(e) + + msg = f'Failed to copy file {src!r} to {dst!r}' + logger.error(f'{msg} due to {errors[0].stderr.decode("utf-8")}') + raise EnvironmentError(msg, errors) + + @contextlib.contextmanager def open_local(uri: str) -> t.Iterator[str]: """Copy a cloud object (e.g. a netcdf, grib, or tif file) from cloud storage, like GCS, to local file.""" - with FileSystems().open(uri) as source_file: - with tempfile.NamedTemporaryFile() as dest_file: - shutil.copyfileobj(source_file, dest_file, DEFAULT_READ_BUFFER_SIZE) - dest_file.flush() - dest_file.seek(0) - yield dest_file.name + with tempfile.NamedTemporaryFile() as dest_file: + copy(uri, dest_file.name) + yield dest_file.name @contextlib.contextmanager From 95783e8b411b95e8d620cbabfbf8f9300b6094b0 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Wed, 12 Apr 2023 11:55:46 -0700 Subject: [PATCH 2/4] Fix lint issues. --- weather_mv/loader_pipeline/regrid.py | 2 -- weather_mv/loader_pipeline/sinks.py | 3 --- 2 files changed, 5 deletions(-) diff --git a/weather_mv/loader_pipeline/regrid.py b/weather_mv/loader_pipeline/regrid.py index a5a75dfc..0fedb94a 100644 --- a/weather_mv/loader_pipeline/regrid.py +++ b/weather_mv/loader_pipeline/regrid.py @@ -27,8 +27,6 @@ import dask import xarray as xr import xarray_beam as xbeam -from apache_beam.io.filesystems import FileSystems -from apache_beam.io.gcp.gcsio import WRITE_CHUNK_SIZE from .sinks import ToDataSink, open_local, copy diff --git a/weather_mv/loader_pipeline/sinks.py b/weather_mv/loader_pipeline/sinks.py index 16bf9d9a..92cef90d 100644 --- a/weather_mv/loader_pipeline/sinks.py +++ b/weather_mv/loader_pipeline/sinks.py @@ -21,7 +21,6 @@ import logging import os import re -import shutil import subprocess import tempfile import typing as t @@ -31,8 +30,6 @@ import numpy as np import rasterio import xarray as xr -from apache_beam.io.filesystems import FileSystems -from apache_beam.io.gcp.gcsio import DEFAULT_READ_BUFFER_SIZE from pyproj import Transformer TIF_TRANSFORM_CRS_TO = "EPSG:4326" From 61e2eefdbe1a191fb18ce30f4e65f4b6765f9362 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Wed, 12 Apr 2023 14:02:58 -0700 Subject: [PATCH 3/4] Fixed `open_local`. I've updated `open_local` to use the same decompression capabilities provided in Apache Beam's `FileSystem` interface. --- weather_mv/loader_pipeline/sinks.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/weather_mv/loader_pipeline/sinks.py b/weather_mv/loader_pipeline/sinks.py index 92cef90d..29522c3c 100644 --- a/weather_mv/loader_pipeline/sinks.py +++ b/weather_mv/loader_pipeline/sinks.py @@ -21,6 +21,7 @@ import logging import os import re +import shutil import subprocess import tempfile import typing as t @@ -30,6 +31,7 @@ import numpy as np import rasterio import xarray as xr +from apache_beam.io.filesystem import CompressionTypes, FileSystem, CompressedFile, DEFAULT_READ_BUFFER_SIZE from pyproj import Transformer TIF_TRANSFORM_CRS_TO = "EPSG:4326" @@ -355,8 +357,21 @@ def copy(src: str, dst: str) -> None: def open_local(uri: str) -> t.Iterator[str]: """Copy a cloud object (e.g. a netcdf, grib, or tif file) from cloud storage, like GCS, to local file.""" with tempfile.NamedTemporaryFile() as dest_file: + # Transfer data with gsutil or gcloud alpha storage (when available) copy(uri, dest_file.name) - yield dest_file.name + + # Check if data is compressed. Decompress the data using the same methods that beam's + # FileSystems interface uses. + compression_type = FileSystem._get_compression_type(uri, CompressionTypes.AUTO) + if compression_type == CompressionTypes.UNCOMPRESSED: + yield dest_file.name + return + + dest_file.seek(0) + with tempfile.NamedTemporaryFile() as dest_uncompressed: + with CompressedFile(dest_file, compression_type=compression_type) as dcomp: + shutil.copyfileobj(dcomp, dest_uncompressed, DEFAULT_READ_BUFFER_SIZE) + yield dest_uncompressed.name @contextlib.contextmanager From 38df74482946db2c8548abef39feeccc70744fd1 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Wed, 12 Apr 2023 14:11:34 -0700 Subject: [PATCH 4/4] Logging all errors during `copy`. --- weather_mv/loader_pipeline/sinks.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/weather_mv/loader_pipeline/sinks.py b/weather_mv/loader_pipeline/sinks.py index 29522c3c..f5e7028e 100644 --- a/weather_mv/loader_pipeline/sinks.py +++ b/weather_mv/loader_pipeline/sinks.py @@ -340,7 +340,7 @@ def __open_dataset_file(filename: str, def copy(src: str, dst: str) -> None: """Copy data via `gcloud alpha storage` or `gsutil`.""" - errors = [] + errors: t.List[subprocess.CalledProcessError] = [] for cmd in ['gcloud alpha storage cp', 'gsutil cp']: try: subprocess.run(cmd.split() + [src, dst], check=True, capture_output=True) @@ -349,7 +349,8 @@ def copy(src: str, dst: str) -> None: errors.append(e) msg = f'Failed to copy file {src!r} to {dst!r}' - logger.error(f'{msg} due to {errors[0].stderr.decode("utf-8")}') + err_msgs = ', '.join(map(lambda err: repr(err.stderr.decode('utf-8')), errors)) + logger.error(f'{msg} due to {err_msgs}.') raise EnvironmentError(msg, errors)