Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

weather-mv: Implemented streaming import of data into BigQuery. #58

Merged
merged 13 commits into from
Jan 7, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 61 additions & 4 deletions weather_mv/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ Weather Mover loads weather data from cloud storage into [Google BigQuery](https
## Usage

```
usage: weather-mv [-h] [-v variables [variables ...]] [-a area [area ...]] -i URIS -o OUTPUT_TABLE [--import_time IMPORT_TIME] [--infer_schema] [-d]
usage: weather-mv [-h] -i URIS -o OUTPUT_TABLE [-v variables [variables ...]] [-a area [area ...]]
[--topic TOPIC] [--window_size WINDOW_SIZE] [--num_shards NUM_SHARDS]
[--import_time IMPORT_TIME] [--infer_schema] [-d]

Weather Mover loads weather data from cloud storage into Google BigQuery.
```
Expand All @@ -26,10 +28,15 @@ _Common options_:

* `-i, --uris`: (required) URI prefix matching input netcdf objects. Ex: gs://ecmwf/era5/era5-2015-""
* `-o, --output_table`: (required) Full name of destination BigQuery table. Ex: my_project.my_dataset.my_table
* `--import_time`: When writing data to BigQuery, record that data import occurred at this time
(format: YYYY-MM-DD HH:MM:SS.usec+offset). Default: now in UTC.
* `-v, --variables`: Target variables for the BigQuery schema. Default: will import all data variables as columns.
* `-a, --area`: Target area in [N, W, S, E]. Default: Will include all available area.
* `--topic`: A Pub/Sub topic for GCS OBJECT_FINALIZE events, or equivalent, of a cloud bucket. E.g. 'projects/<
PROJECT_ID>/topics/<TOPIC_ID>'.
* `--window_size`: Output file's window size in minutes. Only used with the `topic` flag. Default: 1.0 minute.
* `--num_shards`: Number of shards to use when writing windowed elements to cloud storage. Only used with the `topic`
flag. Default: 5 shards.
* `--import_time`: When writing data to BigQuery, record that data import occurred at this time
(format: YYYY-MM-DD HH:MM:SS.usec+offset). Default: now in UTC.
* `--infer_schema`: Download one file in the URI pattern and infer a schema from that file. Default: off

Invoke with `-h` or `--help` to see the full range of options.
Expand All @@ -53,7 +60,7 @@ weather-mv --uris "gs://your-bucket/*.nc" \
--direct_num_workers 2
```

Upload all variables, but for a specific geographic region (for example, the contitental US):
Upload all variables, but for a specific geographic region (for example, the continental US):

```bash
weather-mv --uris "gs://your-bucket/*.nc" \
Expand All @@ -75,3 +82,53 @@ weather-mv --uris "gs://your-bucket/*.nc" \

For a full list of how to configure the Dataflow pipeline, please review
[this table](https://cloud.google.com/dataflow/docs/reference/pipeline-options).

## Streaming ingestion into BigQuery

`weather-mv` optionally provides the ability to react
to [Pub/Sub events for objects added to GCS](https://cloud.google.com/storage/docs/pubsub-notifications). This can be
used to automate ingestion into BigQuery as soon as weather data is disseminated. To set up the Weather Mover with
streaming ingestion, use the `--topic` flag (see above). Objects that don't match the `--uris` will be filtered out of
ingestion. It's worth noting: when setting up PubSub, **make sure to create a topic for GCS `OBJECT_FINALIZE` events
only.**

> Note: It's recommended that you specify variables to ingest (`-v, --variables`) instead of inferring the schema for
> streaming pipelines. Not all variables will be distributed with every file, especially when they are in Grib format.

_Usage Examples_:

```shell
weather-mv --uris "gs://your-bucket/*.nc" \
--output_table $PROJECT.$DATASET_ID.$TABLE_ID \
--topic "projects/$PROJECT/topics/$TOPIC_ID" \
--runner DataflowRunner \
--project $PROJECT \
--temp_location gs://$BUCKET/tmp \
--job_name $JOB_NAME
```

Window incoming data every five minutes instead of every minute.

```shell
weather-mv --uris "gs://your-bucket/*.nc" \
--output_table $PROJECT.$DATASET_ID.$TABLE_ID \
--topic "projects/$PROJECT/topics/$TOPIC_ID" \
--window_size 5 \
--runner DataflowRunner \
--project $PROJECT \
--temp_location gs://$BUCKET/tmp \
--job_name $JOB_NAME
```

Increase the number of shards per window.

```shell
weather-mv --uris "gs://your-bucket/*.nc" \
--output_table $PROJECT.$DATASET_ID.$TABLE_ID \
--topic "projects/$PROJECT/topics/$TOPIC_ID" \
--num_shards 10 \
--runner DataflowRunner \
--project $PROJECT \
--temp_location gs://$BUCKET/tmp \
--job_name $JOB_NAME
```
58 changes: 49 additions & 9 deletions weather_mv/loader_pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
import datetime
import itertools
import logging
import operator
import shutil
import tempfile
import typing as t
from functools import reduce

import apache_beam as beam
import apache_beam.metrics
Expand All @@ -33,6 +35,8 @@
from google.cloud import bigquery
from xarray.core.utils import ensure_us_time_resolution

from .streaming import GroupMessagesByFixedWindows, ParsePaths

DEFAULT_IMPORT_TIME = datetime.datetime.utcfromtimestamp(0).replace(tzinfo=datetime.timezone.utc).isoformat()

DATA_IMPORT_TIME_COLUMN = 'data_import_time'
Expand All @@ -49,6 +53,10 @@ def configure_logger(verbosity: int) -> None:
logger.setLevel(level)


def _prod(xs: t.Iterable[int]) -> int:
return reduce(operator.mul, xs, 1)


def __open_dataset_file(filename: str) -> xr.Dataset:
try:
return xr.open_dataset(filename)
Expand Down Expand Up @@ -207,9 +215,10 @@ def get_coordinates(ds: xr.Dataset) -> t.Iterator[t.Dict]:
# Example:
# {'longitude': -108.0, 'latitude': 49.0, 'time': '2018-01-02T23:00:00+00:00'}
idx = 0
total_coords = _prod(ds.coords.dims.values())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor note: dims may not give you the right set,
IIRC indexes was a subset of dims (check one of the grib test files.)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't the dims of the coords the correct subset? I just ran a small experiment:

>>> ds = xr.open_dataset(gb, engine='cfgrib')  # gb is a path to test_data_grib_single_timestamp
>>> [len(x) for x in ds.coords.indexes.values()]
[1801, 3600]
>>> ds.coords.dims.values()
ValuesView(Frozen({'latitude': 1801, 'longitude': 3600}))

for idx, it in enumerate(coords):
if idx % 1000 == 0:
logger.info(f'Processed {idx // 1000}k coordinates...')
logger.info(f'Processed {idx // 1000}k coordinates of {(total_coords / 1000):2f}k...')
yield dict(zip(ds.coords.indexes, it))

logger.info(f'Finished processing all {(idx / 1000):2f}k coordinates.')
Expand All @@ -218,10 +227,14 @@ def get_coordinates(ds: xr.Dataset) -> t.Iterator[t.Dict]:
def extract_rows(uri: str, *,
variables: t.Optional[t.List[str]] = None,
area: t.Optional[t.List[int]] = None,
import_time: str = DEFAULT_IMPORT_TIME) -> t.Iterator[t.Dict]:
import_time: t.Optional[str] = DEFAULT_IMPORT_TIME) -> t.Iterator[t.Dict]:
"""Reads named netcdf then yields each of its rows as a dict mapping column names to values."""
logger.info(f'Extracting rows as dicts: {uri!r}.')

# re-calculate import time for streaming extractions.
if not import_time:
import_time = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).isoformat()

def to_row(it: t.Dict) -> t.Dict:
"""Produce a single row, or a dictionary of all variables at a point."""

Expand Down Expand Up @@ -276,16 +289,25 @@ def run(argv: t.List[str], save_main_session: bool = True):
prog='weather-mv',
description='Weather Mover loads weather data from cloud storage into Google BigQuery.'
)
parser.add_argument('-i', '--uris', type=str, required=True,
help="URI prefix matching input netcdf objects, e.g. 'gs://ecmwf/era5/era5-2015-'.")
parser.add_argument('-o', '--output_table', type=str, required=True,
help="Full name of destination BigQuery table (<project>.<dataset>.<table>). Table will be "
"created if it doesn't exist.")
parser.add_argument('-v', '--variables', metavar='variables', type=str, nargs='+', default=list(),
help='Target variables for the BigQuery schema. Default: will import all data variables as '
'columns.')
parser.add_argument('-a', '--area', metavar='area', type=int, nargs='+', default=list(),
help='Target area in [N, W, S, E]. Default: Will include all available area.')
parser.add_argument('-i', '--uris', type=str, required=True,
help="URI prefix matching input netcdf objects. Ex: gs://ecmwf/era5/era5-2015-")
parser.add_argument('-o', '--output_table', type=str, required=True,
help=("Full name of destination BigQuery table (<project>.<dataset>.<table>). "
"Table will be created if it doesn't exist."))
parser.add_argument('--topic', type=str,
help="A Pub/Sub topic for GCS OBJECT_FINALIZE events, or equivalent, of a cloud bucket. "
"E.g. 'projects/<PROJECT_ID>/topics/<TOPIC_ID>'.")
parser.add_argument("--window_size", type=float, default=1.0,
help="Output file's window size in minutes. Only used with the `topic` flag. Default: 1.0 "
"minute.")
parser.add_argument('--num_shards', type=int, default=5,
help='Number of shards to use when writing windowed elements to cloud storage. Only used with '
'the `topic` flag. Default: 5 shards.')
parser.add_argument('--import_time', type=str, default=datetime.datetime.utcnow().isoformat(),
help=("When writing data to BigQuery, record that data import occurred at this "
"time (format: YYYY-MM-DD HH:MM:SS.usec+offset). Default: now in UTC."))
Expand All @@ -304,6 +326,13 @@ def run(argv: t.List[str], save_main_session: bool = True):
if known_args.area:
assert len(known_args.area) == 4, 'Must specify exactly 4 lat/long values for area: N, W, S, E boundaries.'

# If a topic is used, then the pipeline must be a streaming pipeline.
if known_args.topic:
pipeline_args.extend('--streaming true'.split())

# make sure we re-compute utcnow() every time rows are extracted from a file.
known_args.import_time = None

# Before starting the pipeline, read one file and generate the BigQuery
# table schema from it. Assumes the number of matching uris is
# manageable.
Expand Down Expand Up @@ -338,9 +367,20 @@ def run(argv: t.List[str], save_main_session: bool = True):
raise

with beam.Pipeline(options=pipeline_options) as p:
if known_args.topic:
paths = (
p
# Windowing is based on this code sample:
# https://cloud.google.com/pubsub/docs/pubsub-dataflow#code_sample
| 'ReadUploadEvent' >> beam.io.ReadFromPubSub(known_args.topic)
| 'WindowInto' >> GroupMessagesByFixedWindows(known_args.window_size, known_args.num_shards)
| 'ParsePaths' >> beam.ParDo(ParsePaths(known_args.uris))
)
else:
paths = p | 'Create' >> beam.Create(all_uris)

(
p
| 'Create' >> beam.Create(all_uris)
paths
| 'ExtractRows' >> beam.FlatMap(
extract_rows,
variables=known_args.variables,
Expand Down
2 changes: 1 addition & 1 deletion weather_mv/loader_pipeline/pipeline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from google.cloud.bigquery import SchemaField

import weather_mv
from weather_mv.loader_pipeline.pipeline import (
from .pipeline import (
dataset_to_table_schema,
_only_target_vars,
extract_rows,
Expand Down
123 changes: 123 additions & 0 deletions weather_mv/loader_pipeline/streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Window and parse Pub/Sub streams of real-time weather data added to cloud storage.

Example windowing code borrowed from:
https://cloud.google.com/pubsub/docs/pubsub-dataflow#code_sample
"""

import datetime
import fnmatch
import json
import logging
import random
import typing as t
from urllib.parse import urlparse

import apache_beam as beam
from apache_beam.transforms.window import FixedWindows

logging.getLogger().setLevel(logging.INFO)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


class GroupMessagesByFixedWindows(beam.PTransform):
"""A composite transform that groups Pub/Sub messages based on publish time
and outputs a list of tuples, each containing a message and its publish time.
"""

def __init__(self, window_size: int, num_shards: int = 5):
# Set window size to 60 seconds.
self.window_size = int(window_size * 60)
self.num_shards = num_shards

def expand(self, pcoll):
return (
pcoll
# Bind window info to each element using element timestamp (or publish time).
| "Window into fixed intervals" >> beam.WindowInto(FixedWindows(self.window_size))
| "Add timestamp to windowed elements" >> beam.ParDo(AddTimestamp())
# Assign a random key to each windowed element based on the number of shards.
| "Add key" >> beam.WithKeys(lambda _: random.randint(0, self.num_shards - 1))
# Group windowed elements by key. All the elements in the same window must fit
# memory for this. If not, you need to use `beam.util.BatchElements`.
| "Group by key" >> beam.GroupByKey()
)


class AddTimestamp(beam.DoFn):
"""Processes each windowed element by extracting the message body and its
publish time into a tuple.
"""

def process(self, element, publish_time=beam.DoFn.TimestampParam) -> t.Iterable[t.Tuple[str, str]]:
yield (
element.decode("utf-8"),
datetime.datetime.utcfromtimestamp(float(publish_time)).strftime(
"%Y-%m-%d %H:%M:%S.%f"
),
)


class ParsePaths(beam.DoFn):
"""Parse paths to real-time weather data from windowed-batches."""

def __init__(self, uri_pattern: str):
self.uri_pattern = uri_pattern
self.protocol = f'{urlparse(uri_pattern).scheme}://'
super().__init__()

@classmethod
def try_parse_message(cls, message_body: t.Union[str, t.Dict]) -> t.Dict:
"""Robustly parse message body, which will be JSON in the vast majority of cases,
but might be a dictionary."""
try:
return json.loads(message_body)
except (json.JSONDecodeError, TypeError):
if type(message_body) is dict:
return message_body
raise

def to_object_path(self, payload: t.Dict) -> str:
"""Parse cloud object from Pub/Sub topic payload."""
return f'{self.protocol}{payload["bucket"]}/{payload["name"]}'

def should_skip(self, message_body: t.Dict) -> bool:
"""Returns true if Pub/Sub topic does *not* match the target file URI pattern."""
try:
return not fnmatch.fnmatch(self.to_object_path(message_body), self.uri_pattern)
except KeyError:
return True

def process(self, key_value, window=beam.DoFn.WindowParam) -> t.Iterable[str]:
"""Yield paths to real-time weather data in cloud storage."""

shard_id, batch = key_value

logger.debug(f'Processing shard {shard_id!r}.')

for message_body, publish_time in batch:
logger.debug(message_body)

parsed_msg = self.try_parse_message(message_body)

target = self.to_object_path(parsed_msg)
logger.info(f'Parsed path {target!r}...')

if self.should_skip(parsed_msg):
logger.info('skipping.')
alxmrs marked this conversation as resolved.
Show resolved Hide resolved
continue

yield target
Loading