diff --git a/weather_mv/loader_pipeline/bq.py b/weather_mv/loader_pipeline/bq.py index a1cac9f9..68850d6a 100644 --- a/weather_mv/loader_pipeline/bq.py +++ b/weather_mv/loader_pipeline/bq.py @@ -104,6 +104,8 @@ class ToBigQuery(ToDataSink): skip_creating_polygon: bool = False lat_grid_resolution: t.Optional[float] = None lon_grid_resolution: t.Optional[float] = None + start_date: t.Optional[str] = None + end_date: t.Optional[str] = None @classmethod def add_parser_arguments(cls, subparser: argparse.ArgumentParser): @@ -154,8 +156,10 @@ def validate_arguments(cls, known_args: argparse.Namespace, pipeline_args: t.Lis _, uri_extension = os.path.splitext(known_args.uris) if (uri_extension in ['.tif', '.tiff'] and not known_args.tif_metadata_for_start_time): raise RuntimeError("'--tif_metadata_for_start_time' is required for tif files.") - elif (uri_extension not in ['.tif', '.tiff'] and (known_args.tif_metadata_for_start_time - or known_args.tif_metadata_for_end_time)): + elif uri_extension not in ['.tif', '.tiff'] and ( + known_args.tif_metadata_for_start_time + or known_args.tif_metadata_for_end_time + ): raise RuntimeError("'--tif_metadata_for_start_time' and " "'--tif_metadata_for_end_time' can be specified only for tif files.") @@ -171,6 +175,8 @@ def __post_init__(self): """Initializes Sink by creating a BigQuery table based on user input.""" if self.zarr: self.xarray_open_dataset_kwargs = self.zarr_kwargs + self.start_date = self.zarr_kwargs.get('start_date') + self.end_date = self.zarr_kwargs.get('end_date') with open_dataset(self.first_uri, self.xarray_open_dataset_kwargs, self.disable_grib_schema_normalization, self.tif_metadata_for_start_time, self.tif_metadata_for_end_time, is_zarr=self.zarr) as open_ds: @@ -311,6 +317,10 @@ def expand(self, paths): xarray_open_dataset_kwargs = self.xarray_open_dataset_kwargs.copy() xarray_open_dataset_kwargs.pop('chunks') ds, chunks = xbeam.open_zarr(self.first_uri, **xarray_open_dataset_kwargs) + + if self.start_date is not None and self.end_date is not None: + ds = ds.sel(time=slice(self.start_date, self.end_date)) + ds.attrs[DATA_URI_COLUMN] = self.first_uri extracted_rows = ( paths diff --git a/weather_mv/loader_pipeline/pipeline.py b/weather_mv/loader_pipeline/pipeline.py index 842d28c1..ef685473 100644 --- a/weather_mv/loader_pipeline/pipeline.py +++ b/weather_mv/loader_pipeline/pipeline.py @@ -17,6 +17,7 @@ import json import logging import typing as t +import warnings import apache_beam as beam from apache_beam.io.filesystems import FileSystems @@ -145,6 +146,11 @@ def run(argv: t.List[str]) -> t.Tuple[argparse.Namespace, t.List[str]]: if known_args.zarr_kwargs and not known_args.zarr: raise ValueError('`--zarr_kwargs` argument is only allowed with valid Zarr input URI.') + if known_args.zarr_kwargs: + if not known_args.zarr_kwargs.get('start_date') or not known_args.zarr_kwargs.get('end_date'): + warnings.warn('`--zarr_kwargs` not contains both `start_date` and `end_date`' + 'so whole zarr-dataset will ingested.') + if known_args.zarr: known_args.zarr_kwargs['chunks'] = known_args.zarr_kwargs.get('chunks', None) known_args.zarr_kwargs['consolidated'] = known_args.zarr_kwargs.get('consolidated', True)