-
Notifications
You must be signed in to change notification settings - Fork 40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
weather-mv
: Implemented streaming import of data into BigQuery.
#58
Merged
Merged
Changes from all commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
243c3b5
WIP: rough structure of the streaming feature.
alxmrs 41b38ae
Merge branch 'main' of github.com:googlestaging/weather-tools into mv…
alxmrs 34461e4
Implemented and tested topic / streaming functionality.
alxmrs 25eb6dd
Merge branch 'main' of github.com:googlestaging/weather-tools into mv…
alxmrs b03505a
Fixed broken pickler: Now only using relative imports.
alxmrs b086c9a
Import time should be re-computed for streaming pipelines.
alxmrs 4ea312f
Skipping logic should only reflect URI matching.
alxmrs c126107
Nit: consistent step names.
alxmrs 11c0958
Better log statment: includes total coordinates.
alxmrs a1bb697
Merge branch 'main' of github.com:googlestaging/weather-tools into mv…
alxmrs 7873bcf
Fixed unit tests.
alxmrs 12d61fa
Added documentation for new streaming feature.
alxmrs d0d09fb
Added filename to "skipping" log.
alxmrs File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
# Copyright 2021 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# https://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
"""Window and parse Pub/Sub streams of real-time weather data added to cloud storage. | ||
|
||
Example windowing code borrowed from: | ||
https://cloud.google.com/pubsub/docs/pubsub-dataflow#code_sample | ||
""" | ||
|
||
import datetime | ||
import fnmatch | ||
import json | ||
import logging | ||
import random | ||
import typing as t | ||
from urllib.parse import urlparse | ||
|
||
import apache_beam as beam | ||
from apache_beam.transforms.window import FixedWindows | ||
|
||
logging.getLogger().setLevel(logging.INFO) | ||
logger = logging.getLogger(__name__) | ||
logger.setLevel(logging.INFO) | ||
|
||
|
||
class GroupMessagesByFixedWindows(beam.PTransform): | ||
"""A composite transform that groups Pub/Sub messages based on publish time | ||
and outputs a list of tuples, each containing a message and its publish time. | ||
""" | ||
|
||
def __init__(self, window_size: int, num_shards: int = 5): | ||
# Set window size to 60 seconds. | ||
self.window_size = int(window_size * 60) | ||
self.num_shards = num_shards | ||
|
||
def expand(self, pcoll): | ||
return ( | ||
pcoll | ||
# Bind window info to each element using element timestamp (or publish time). | ||
| "Window into fixed intervals" >> beam.WindowInto(FixedWindows(self.window_size)) | ||
| "Add timestamp to windowed elements" >> beam.ParDo(AddTimestamp()) | ||
# Assign a random key to each windowed element based on the number of shards. | ||
| "Add key" >> beam.WithKeys(lambda _: random.randint(0, self.num_shards - 1)) | ||
# Group windowed elements by key. All the elements in the same window must fit | ||
# memory for this. If not, you need to use `beam.util.BatchElements`. | ||
| "Group by key" >> beam.GroupByKey() | ||
) | ||
|
||
|
||
class AddTimestamp(beam.DoFn): | ||
"""Processes each windowed element by extracting the message body and its | ||
publish time into a tuple. | ||
""" | ||
|
||
def process(self, element, publish_time=beam.DoFn.TimestampParam) -> t.Iterable[t.Tuple[str, str]]: | ||
yield ( | ||
element.decode("utf-8"), | ||
datetime.datetime.utcfromtimestamp(float(publish_time)).strftime( | ||
"%Y-%m-%d %H:%M:%S.%f" | ||
), | ||
) | ||
|
||
|
||
class ParsePaths(beam.DoFn): | ||
"""Parse paths to real-time weather data from windowed-batches.""" | ||
|
||
def __init__(self, uri_pattern: str): | ||
self.uri_pattern = uri_pattern | ||
self.protocol = f'{urlparse(uri_pattern).scheme}://' | ||
super().__init__() | ||
|
||
@classmethod | ||
def try_parse_message(cls, message_body: t.Union[str, t.Dict]) -> t.Dict: | ||
"""Robustly parse message body, which will be JSON in the vast majority of cases, | ||
but might be a dictionary.""" | ||
try: | ||
return json.loads(message_body) | ||
except (json.JSONDecodeError, TypeError): | ||
if type(message_body) is dict: | ||
return message_body | ||
raise | ||
|
||
def to_object_path(self, payload: t.Dict) -> str: | ||
"""Parse cloud object from Pub/Sub topic payload.""" | ||
return f'{self.protocol}{payload["bucket"]}/{payload["name"]}' | ||
|
||
def should_skip(self, message_body: t.Dict) -> bool: | ||
"""Returns true if Pub/Sub topic does *not* match the target file URI pattern.""" | ||
try: | ||
return not fnmatch.fnmatch(self.to_object_path(message_body), self.uri_pattern) | ||
except KeyError: | ||
return True | ||
|
||
def process(self, key_value, window=beam.DoFn.WindowParam) -> t.Iterable[str]: | ||
"""Yield paths to real-time weather data in cloud storage.""" | ||
|
||
shard_id, batch = key_value | ||
|
||
logger.debug(f'Processing shard {shard_id!r}.') | ||
|
||
for message_body, publish_time in batch: | ||
logger.debug(message_body) | ||
|
||
parsed_msg = self.try_parse_message(message_body) | ||
|
||
target = self.to_object_path(parsed_msg) | ||
logger.info(f'Parsed path {target!r}...') | ||
|
||
if self.should_skip(parsed_msg): | ||
logger.info(f'skipping {target!r}.') | ||
continue | ||
|
||
yield target |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor note: dims may not give you the right set,
IIRC indexes was a subset of dims (check one of the grib test files.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aren't the
dims
of thecoords
the correct subset? I just ran a small experiment: