Skip to content

Commit

Permalink
Update weather-sp's templating system to allow users to specify level…
Browse files Browse the repository at this point in the history
… and shortname. (#105)

* Attempt to read files to choose splitter, not filepaths.

* Updated templating system to let users specify level and shortname.

Fix for #102.

* Supporting output directory with template strategy.

* Removed hard-coded file endings. Fixed lint & tests breakages.

* Fix dict / tuple error found in dry-run.

* Update documentation.

* Defaulting to using the grib splitter.

* Better way to test for netCDF and Grib files.

* Updated variable from `level` to `levelType`.

* Updated readme with new var change.

* Fix file name processing, uses knowledge of standard file names.

* rm unnecessary line
  • Loading branch information
alxmrs authored Feb 10, 2022
1 parent eb95533 commit 128a028
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 97 deletions.
38 changes: 19 additions & 19 deletions weather_sp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ Split weather data file into files by variable.
_Common options_:

* `-i, --input-pattern`: Pattern for input weather data.
* `--output-template`: Path to template using Python formatting,
see [Output section](#output) below. Mutually exclusive with `--output-dir`.
* `--output-dir`: Path to base folder for output files, see [Output section](#output) below.
Mutually exclusive with `--output-template`
* `--output-template`: Path to template using Python formatting, see [Output section](#output) below. Mutually exclusive
with `--output-dir`.
* `--output-dir`: Path to base folder for output files, see [Output section](#output) below. Mutually exclusive
with `--output-template`
* `-d, --dry-run`: Test the input file matching and the output file scheme without splitting.

Invoke with `-h` or `--help` to see the full range of options.
Expand Down Expand Up @@ -76,12 +76,12 @@ On the other hand, to specify a specific pattern use

## Output

The base output file names are specified using the `--output-template` or `--output-dir` flags.
These flags are mutually exclusive, and one of them is required.
The base output file names are specified using the `--output-template` or `--output-dir` flags. These flags are mutually
exclusive, and one of them is required.

### Output directory
Based on the output directory path, the directory structure of the
input pattern is replicated.

Based on the output directory path, the directory structure of the input pattern is replicated.

To create the directory structure, the common path of the input pattern is removed from the input file path and replaced
with the output path. A '_' is added to separate the split level / variable.
Expand All @@ -94,27 +94,27 @@ Example:
```

For a file `gs://test-input/era5/2020/02/01.nc` the output file pattern is
`gs://test-output/splits/2020/02/01_` and if the temperature is a variable in that data, the output file for that
split will be `gs://test-output/splits/2020/02/01_t.nc`
`gs://test-output/splits/2020/02/01.{shortname}.nc` and if the temperature is a variable in that data, the output file
for that split will be `gs://test-output/splits/2020/02/01.t.nc`

### Output template with Python-style formatting
Using Python-style substitution (e.g. `{1}`) allows for more flexibility when creating the output files.
The substitutions are based on the directory structure of the input file, where each `{<x>}` stands for
one directory name, counting backwards from the end, i.e. the file name is `{0}`, the immediate
directory in which it is located is `{1}`, and so on.

Using Python-style substitution (e.g. `{1}`) allows for more flexibility when creating the output files. The
substitutions are based on the directory structure of the input file, where each `{<x>}` stands for one directory name,
counting backwards from the end, i.e. the file name is `{0}`, the immediate directory in which it is located is `{1}`,
and so on. In addition, you may supply `{shortname}` or `{levelType}` in the output template. These will be filled by values
found in each file (NetCDF files typically only have one type of level, so this variable is not needed).

Example:

```bash
--input-pattern 'gs://test-input/era5/2020/**' \
--output-template 'gs://test-output/splits/{2}.{0}.{1}T00.'
--output-template 'gs://test-output/splits/{2}.{0}.{1}T00.{shortname}.nc'
```

For a file `gs://test-input/era5/2020/02/01.nc` the output file pattern is
`gs://test-output/splits/2020.01.02T00.` and if the temperature is a variable in that data, the output file for that
split will be `gs://test-output/splits/2020/01/02T00.t.nc`

Note that in this case no '_' is added, the template is used as is.
`gs://test-output/splits/2020.01.02T00.{shortname}.nc` and if the temperature is a variable in that data, the output
file for that split will be `gs://test-output/splits/2020/01/02T00.t.nc`

## Dry run

Expand Down
36 changes: 20 additions & 16 deletions weather_sp/splitter_pipeline/file_name_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,26 @@
import os
import typing as t

logger = logging.getLogger(__name__)

GRIB_FILE_ENDINGS = ('.grib', '.grb', '.grb2', '.grib2', '.gb')
NETCDF_FILE_ENDINGS = ('.nc', '.cd')

logger = logging.getLogger(__name__)


class OutFileInfo(t.NamedTuple):
file_name_base: str
file_name_template: str
ending: str
output_dir: bool = False

def __str__(self):
return f'{self.file_name_base}/*/{self.ending}'
return f'{self.file_name_template}/*/{self.ending}'


def get_output_file_base_name(filename: str,
out_pattern: t.Optional[str],
out_dir: t.Optional[str],
input_base_dir: str) -> OutFileInfo:
"""Construct the base output file name by applying the out_pattern to the
filename.
input_base_dir: str = '',
out_pattern: t.Optional[str] = None,
out_dir: t.Optional[str] = None) -> OutFileInfo:
"""Construct the base output file name by applying the out_pattern to the filename.
Example:
filename = 'gs://my_bucket/data_to_split/2020/01/21.nc'
Expand All @@ -51,21 +51,25 @@ def get_output_file_base_name(filename: str,
The output file is then created by replacing this part of the input name
with the output pattern.
"""
file_ending = ''
split_name, ending = os.path.splitext(filename)
if ending in [*GRIB_FILE_ENDINGS, *NETCDF_FILE_ENDINGS]:
file_ending = ending
if ending in GRIB_FILE_ENDINGS or ending in NETCDF_FILE_ENDINGS:
filename = split_name
else:
ending = ''

if out_dir:
return OutFileInfo(f'{filename.replace(input_base_dir, out_dir)}_',
file_ending)
return OutFileInfo(
f'{filename.replace(input_base_dir, out_dir)}.{{levelType}}{{shortname}}{ending}',
ending,
output_dir=True
)

if out_pattern:
in_sections = []
path = filename
while path:
path, tail = os.path.split(path)
in_sections.append(tail)
return OutFileInfo(out_pattern.format(*in_sections), file_ending)
return OutFileInfo(out_pattern.format(*in_sections, shortname="{shortname}", levelType="{levelType}"), ending)

raise ValueError('no output specified')
raise ValueError('no output specified.')
27 changes: 23 additions & 4 deletions weather_sp/splitter_pipeline/file_name_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,48 @@ def test_get_output_file_base_name_format(self):
out_pattern='gs://my_bucket/splits/{2}-{1}-{0}_old_data.',
out_dir=None,
input_base_dir='ignored')
self.assertEqual(out_info.file_name_base, 'gs://my_bucket/splits/2020-01-21_old_data.')
self.assertEqual(out_info.file_name_template, 'gs://my_bucket/splits/2020-01-21_old_data.')
self.assertEqual(out_info.ending, '.nc')

def test_get_output_file_base_name_replace(self):
out_info = get_output_file_base_name(filename='gs://my_bucket/data_to_split/2020/01/21.nc',
out_pattern=None,
out_dir='gs://my_bucket/splits/',
input_base_dir='gs://my_bucket/data_to_split/')
self.assertEqual(out_info.file_name_base, 'gs://my_bucket/splits/2020/01/21_')
self.assertEqual(out_info.file_name_template, 'gs://my_bucket/splits/2020/01/21.{levelType}{shortname}.nc')
self.assertEqual(out_info.ending, '.nc')

def test_get_output_file_base_name_format_no_fileending(self):
out_info = get_output_file_base_name(filename='gs://my_bucket/data_to_split/2020/01/21',
out_pattern='gs://my_bucket/splits/{2}-{1}-{0}_old_data.',
out_dir=None,
input_base_dir='ignored')
self.assertEqual(out_info.file_name_base, 'gs://my_bucket/splits/2020-01-21_old_data.')
self.assertEqual(out_info.file_name_template, 'gs://my_bucket/splits/2020-01-21_old_data.')
self.assertEqual(out_info.ending, '')

def test_get_output_file_base_name_format_filecontainsdots(self):
out_info = get_output_file_base_name(filename='gs://my_bucket/data_to_split/2020/01/21.T00z.stuff',
out_pattern='gs://my_bucket/splits/{2}-{1}-{0}_old_data.',
out_dir=None,
input_base_dir='ignored')
self.assertEqual(out_info.file_name_base, 'gs://my_bucket/splits/2020-01-21.T00z.stuff_old_data.')
self.assertEqual(out_info.file_name_template, 'gs://my_bucket/splits/2020-01-21.T00z.stuff_old_data.')
self.assertEqual(out_info.ending, '')

def test_accepts_shortname(self):
out_info = get_output_file_base_name(filename='gs://my_bucket/data_to_split/2020/01/21.nc',
out_pattern='gs://my_bucket/splits/{2}-{1}-{0}_old_data.{shortname}.nc',
out_dir=None,
input_base_dir='ignored')
self.assertEqual(out_info.file_name_template, 'gs://my_bucket/splits/2020-01-21_old_data.{shortname}.nc')
self.assertEqual(out_info.ending, '.nc')

def test_accepts_shortname_and_level(self):
out_info = get_output_file_base_name(
filename='gs://my_bucket/data_to_split/2020/01/21.nc',
out_pattern='gs://my_bucket/splits/{2}-{1}-{0}_old_data.{levelType}_{shortname}.nc',
out_dir=None,
input_base_dir='ignored'
)
self.assertEqual(out_info.file_name_template,
'gs://my_bucket/splits/2020-01-21_old_data.{levelType}_{shortname}.nc')
self.assertEqual(out_info.ending, '.nc')
67 changes: 31 additions & 36 deletions weather_sp/splitter_pipeline/file_splitters.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,30 @@
# limitations under the License.

import abc
import apache_beam.metrics as metrics
import logging
import netCDF4 as nc
import pygrib
import shutil
import tempfile
import typing as t
from apache_beam.io.filesystems import FileSystems
from contextlib import contextmanager

from .file_name_utils import OutFileInfo, GRIB_FILE_ENDINGS, NETCDF_FILE_ENDINGS
import apache_beam.metrics as metrics
import netCDF4 as nc
import pygrib
from apache_beam.io.filesystems import FileSystems

from .file_name_utils import OutFileInfo

logger = logging.getLogger(__name__)


class SplitKey(t.NamedTuple):
level: str
short_name: str
levelType: str
shortname: str

def __str__(self):
if not self.level:
return f'field {self.short_name}'
return f'{self.level} - field {self.short_name}'
if not self.levelType:
return f'field {self.shortname}'
return f'{self.levelType} - field {self.shortname}'


class FileSplitter(abc.ABC):
Expand Down Expand Up @@ -67,17 +68,14 @@ def _copy_dataset_to_storage(self, src_file: t.IO, target: str):
shutil.copyfileobj(src_file, dest_file)

def _get_output_file_path(self, key: SplitKey) -> str:
level = '{level}_'.format(level=key.level) if key.level else ''
return '{base}{level}{sn}{ending}'.format(
base=self.output_info.file_name_base, level=level, sn=key.short_name,
ending=self.output_info.ending)
split_keys = key._asdict()
if self.output_info.output_dir and key.levelType:
split_keys['levelType'] = f'{key.levelType}_'
return self.output_info.file_name_template.format(**split_keys)


class GribSplitter(FileSplitter):

def __init__(self, input_path: str, output_info: OutFileInfo):
super().__init__(input_path, output_info)

def split_data(self) -> None:
outputs = dict()

Expand Down Expand Up @@ -106,9 +104,6 @@ def _open_outfile(self, key: SplitKey):

class NetCdfSplitter(FileSplitter):

def __init__(self, input_path: str, output_info: OutFileInfo):
super().__init__(input_path, output_info)

def split_data(self) -> None:
with self._open_dataset_locally() as nc_data:
fields = [var for var in nc_data.variables.keys() if
Expand Down Expand Up @@ -150,27 +145,27 @@ def _create_netcdf_dataset_for_variable(self, dataset: nc.Dataset,


class DrySplitter(FileSplitter):
def __init__(self, file_path: str, output_info: OutFileInfo):
super().__init__(file_path, output_info)

def split_data(self) -> None:
self.logger.info('input file: %s - output scheme: %s',
self.input_path, self._get_output_file_path(SplitKey('level', 'shortname')))


def get_splitter(file_path: str, output_info: OutFileInfo,
dry_run: bool) -> FileSplitter:
if output_info.ending in NETCDF_FILE_ENDINGS:
metrics.Metrics.counter('get_splitter', 'netcdf').inc()
if dry_run:
return DrySplitter(file_path, output_info)
return NetCdfSplitter(file_path, output_info)
if output_info.ending in GRIB_FILE_ENDINGS:
metrics.Metrics.counter('get_splitter', 'grib').inc()
else:
logger.info('unspecified file type, assuming grib for %s', file_path)
metrics.Metrics.counter('get_splitter',
'unidentified grib').inc()
def get_splitter(file_path: str, output_info: OutFileInfo, dry_run: bool) -> FileSplitter:
if dry_run:
return DrySplitter(file_path, output_info)
return GribSplitter(file_path, output_info)

with FileSystems.open(file_path) as f:
header = f.read(4)

if b'GRIB' in header:
metrics.Metrics.counter('get_splitter', 'grib').inc()
return GribSplitter(file_path, output_info)

# See the NetCDF Spec docs:
# https://docs.unidata.ucar.edu/netcdf-c/current/faq.html#How-can-I-tell-which-format-a-netCDF-file-uses
if b'CDF' in header or b'HDF' in header:
metrics.Metrics.counter('get_splitter', 'netcdf').inc()
return NetCdfSplitter(file_path, output_info)

raise ValueError(f'cannot determine if file {file_path!r} is Grib or NetCDF.')
44 changes: 29 additions & 15 deletions weather_sp/splitter_pipeline/file_splitters_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,29 @@

class GetSplitterTest(unittest.TestCase):

def setUp(self) -> None:
self._data_dir = f'{next(iter(weather_sp.__path__))}/test_data'

def test_get_splitter_grib(self):
splitter = get_splitter('some/file/path/data.grib', OutFileInfo(file_name_base='some_out', ending='.grib'),
splitter = get_splitter(f'{self._data_dir}/era5_sample.grib',
OutFileInfo(file_name_template='some_out', ending='.grib'),
dry_run=False)
self.assertIsInstance(splitter, GribSplitter)

def test_get_splitter_nc(self):
splitter = get_splitter('some/file/path/data.nc', OutFileInfo(file_name_base='some_out', ending='.nc'),
splitter = get_splitter(f'{self._data_dir}/era5_sample.nc',
OutFileInfo(file_name_template='some_out', ending='.nc'),
dry_run=False)
self.assertIsInstance(splitter, NetCdfSplitter)

def test_get_splitter_undetermined(self):
splitter = get_splitter('some/file/path/data', OutFileInfo(file_name_base='some_out', ending=''),
def test_get_splitter_undetermined_grib(self):
splitter = get_splitter(f'{self._data_dir}/era5_sample_grib',
OutFileInfo(file_name_template='some_out', ending=''),
dry_run=False)
self.assertIsInstance(splitter, GribSplitter)

def test_get_splitter_dryrun(self):
splitter = get_splitter('some/file/path/data.grib', OutFileInfo(file_name_base='some_out', ending='.grib'),
splitter = get_splitter('some/file/path/data.grib', OutFileInfo(file_name_template='some_out', ending='.grib'),
dry_run=True)
self.assertIsInstance(splitter, DrySplitter)

Expand All @@ -64,20 +70,28 @@ def tearDown(self):
shutil.rmtree(split_dir)

def test_get_output_file_path(self):
splitter = GribSplitter('path/to/input', OutFileInfo(file_name_base='path/output/file_', ending='.grib'))
out = splitter._get_output_file_path(SplitKey('level', 'cc'))
self.assertEqual(out, 'path/output/file_level_cc.grib')
splitter = GribSplitter(
'path/to/input',
OutFileInfo(file_name_template='path/output/file.{levelType}_{shortname}.grib', ending='.grib')
)
out = splitter._get_output_file_path(SplitKey('surface', 'cc'))
self.assertEqual(out, 'path/output/file.surface_cc.grib')

@patch('apache_beam.io.filesystems.FileSystems.create')
def test_open_outfile(self, mock_io):
splitter = GribSplitter('path/to/input', OutFileInfo(file_name_base='path/output/file_', ending='.grib'))
splitter._open_outfile(SplitKey('level', 'cc'))
mock_io.assert_called_with('path/output/file_level_cc.grib')
splitter = GribSplitter(
'path/to/input',
OutFileInfo(file_name_template='path/output/file_{levelType}_{shortname}.grib', ending='.grib')
)
splitter._open_outfile(SplitKey('surface', 'cc'))
mock_io.assert_called_with('path/output/file_surface_cc.grib')

def test_split_data(self):
input_path = f'{self._data_dir}/era5_sample.grib'
splitter = GribSplitter(input_path,
OutFileInfo(f'{self._data_dir}/split_files/era5_sample_', '.grib'))
splitter = GribSplitter(
input_path,
OutFileInfo(f'{self._data_dir}/split_files/era5_sample_{{levelType}}_{{shortname}}.grib', '.grib')
)
splitter.split_data()
self.assertTrue(os.path.exists(f'{self._data_dir}/split_files/'))

Expand Down Expand Up @@ -113,14 +127,14 @@ def tearDown(self):
shutil.rmtree(split_dir)

def test_get_output_file_path(self):
splitter = NetCdfSplitter('path/to/input', OutFileInfo('path/output/file_', '.nc'))
splitter = NetCdfSplitter('path/to/input', OutFileInfo('path/output/file_{shortname}.nc', '.nc'))
out = splitter._get_output_file_path(SplitKey('', 'cc'))
self.assertEqual(out, 'path/output/file_cc.nc')

def test_split_data(self):
input_path = f'{self._data_dir}/era5_sample.nc'
splitter = NetCdfSplitter(input_path,
OutFileInfo(f'{self._data_dir}/split_files/era5_sample_', '.nc'))
OutFileInfo(f'{self._data_dir}/split_files/era5_sample_{{shortname}}.nc', '.nc'))
splitter.split_data()
self.assertTrue(os.path.exists(f'{self._data_dir}/split_files/'))
input_data = xr.open_dataset(input_path, engine='netcdf4')
Expand Down
Loading

0 comments on commit 128a028

Please sign in to comment.