Skip to content

Commit

Permalink
Added Cloud Monitoring metrics to the EE ingestion pipeline.
Browse files Browse the repository at this point in the history
  • Loading branch information
j9sh264 committed Sep 20, 2024
1 parent f27e28d commit 57bd60d
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 7 deletions.
1 change: 1 addition & 0 deletions ci3.8.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@ dependencies:
- cython==0.29.34
- earthengine-api==0.1.329
- git+https:/dabhicusp/cdsapi-beta-google-weather-tools.git@master#egg=cdsapi # TODO([#474](https:/google/weather-tools/issues/474)): Compatible cdsapi with weather-dl.
- google-cloud-monitoring
- .[test]
1 change: 1 addition & 0 deletions ci3.9.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@ dependencies:
- cython==0.29.34
- earthengine-api==0.1.329
- git+https:/dabhicusp/cdsapi-beta-google-weather-tools.git@master#egg=cdsapi # TODO([#474](https:/google/weather-tools/issues/474)): Compatible cdsapi with weather-dl.
- google-cloud-monitoring
- .[test]
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies:
- firebase-admin==6.0.1
- setuptools==70.3.0
- git+https:/dabhicusp/cdsapi-beta-google-weather-tools.git@master#egg=cdsapi # TODO([#474](https:/google/weather-tools/issues/474)): Compatible cdsapi with weather-dl.
- google-cloud-monitoring
- .
- ./weather_dl
- ./weather_mv
Expand Down
6 changes: 3 additions & 3 deletions weather_mv/loader_pipeline/ee.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ class ToEarthEngine(ToDataSink):
initialization_time_regex: str
forecast_time_regex: str
ingest_as_virtual_asset: bool
use_deflate:bool
use_deflate: bool
use_metrics: bool
topic: str

Expand Down Expand Up @@ -376,8 +376,8 @@ def expand(self, paths):
| 'IngestIntoEE' >> IngestIntoEETransform.from_kwargs(**vars(self))
)

if self.use_metrics:
output | 'AddMetrics' >> beam.ParDo(AddMetrics())
if self.use_metrics and not self.skip_region_validation:
output | 'AddMetrics' >> AddMetrics.from_kwargs(**vars(self))
else:
(
paths
Expand Down
96 changes: 93 additions & 3 deletions weather_mv/loader_pipeline/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,23 @@
import time
import typing as t
from collections import OrderedDict
from functools import wraps

import apache_beam as beam
from apache_beam.metrics import metric
from apache_beam.transforms import window, trigger
from functools import wraps
from google.cloud import monitoring_v3

from .sinks import get_file_time, KwargsFactoryMixin

logger = logging.getLogger(__name__)

# For Metrics API retry logic.
INITIAL_DELAY = 1.0 # Initial delay in seconds.
MAX_DELAY = 600 # Maximum delay before giving up in seconds.
NUM_RETRIES = 10 # Number of tries with exponential backoff.
TASK_QUEUE_WAIT_TIME = 120 # Task queue wait time in seconds.


def timeit(func_name: str, keyed_fn: bool = False):
"""Decorator to add time it takes for an element to be processed by a stage.
Expand Down Expand Up @@ -115,7 +123,7 @@ def process(self, element) -> t.Iterator[t.Any]:
yield element, time_dict


class AddMetrics(beam.DoFn):
class AddBeamMetrics(beam.DoFn):
"""DoFn to add Element Processing Time metric to beam. Expects PCollection
to contain a time_dict."""

Expand Down Expand Up @@ -170,8 +178,90 @@ def process(self, element):
logger.info(
f"{uri}: Time from {current_step} -> {next_step}: {step_time} seconds."
)

yield ("custom_metrics", (data_latency_ms / 1000, element_processing_time / 1000))
except Exception as e:
logger.warning(
f"Some error occured while adding metrics. Error {e}"
)


@dataclasses.dataclass
class CreateTimeSeries(beam.DoFn):
"""DoFn to write metrics TimeSeries data in Google Cloud Monitoring."""
job_name: str
project: str
region: str

def create_time_series(self, metric_name: str, metric_value: float) -> None:
"""Writes data to a Metrics TimeSeries."""
client = monitoring_v3.MetricServiceClient()
series = monitoring_v3.TimeSeries()
series.metric.type = f"custom.googleapis.com/{metric_name}"
series.metric.labels["description"] = metric_name
series.resource.type = "dataflow_job"
series.resource.labels["job_name"] = self.job_name
series.resource.labels["project_id"] = self.project
series.resource.labels["region"] = self.region

now = time.time()
seconds = int(now)
nanos = int((now - seconds) * 10**9)
interval = monitoring_v3.TimeInterval(
{"end_time": {"seconds": seconds, "nanos": nanos}}
)

point = monitoring_v3.Point(
{"interval": interval, "value": {"double_value": metric_value}}
)
series.points = [point]
client.create_time_series(
name=f"projects/{self.project}", time_series=[series]
)
logger.info(
f"Successfully created time series for {metric_name}. Metric value: {metric_value}."
)

def process(self, element: t.Any):
_, metric_values = element
data_latency_times = [x[0] for x in metric_values]
element_processing_times = [x[1] for x in metric_values]

logger.info(f"data_latency_time values: {data_latency_times}")
self.create_time_series("data_latency_time_max", max(data_latency_times))
self.create_time_series(
"data_latency_time_mean", sum(data_latency_times) / len(data_latency_times)
)

logger.info(f"element_processing_time values: {element_processing_times}")
self.create_time_series("element_processing_time_max", max(element_processing_times))
self.create_time_series(
"element_processing_time_mean", sum(element_processing_times) / len(element_processing_times)
)


@dataclasses.dataclass
class AddMetrics(beam.PTransform, KwargsFactoryMixin):
job_name: str
project: str
region: str

def expand(self, pcoll: beam.PCollection):
return (
pcoll
| "AddBeamMetrics" >> beam.ParDo(AddBeamMetrics())
| "AddTimestamps"
>> beam.Map(
lambda element: window.TimestampedValue(element, time.time())
)
| "Window"
>> beam.WindowInto(
window.GlobalWindows(),
trigger=trigger.Repeatedly(trigger.AfterProcessingTime(5)),
accumulation_mode=trigger.AccumulationMode.DISCARDING,
)
| "GroupByKeyAndWindow" >> beam.GroupByKey(lambda element: element)
| "CreateTimeSeries"
>> beam.ParDo(
CreateTimeSeries(self.job_name, self.project, self.region)
)
)
16 changes: 15 additions & 1 deletion weather_mv/loader_pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ def pattern_to_uris(match_pattern: str, is_zarr: bool = False) -> t.Iterable[str
yield from [x.path for x in match.metadata_list]


def arguments_to_dict(args: t.List[str]) -> t.Dict[str, str]:
"""Converts a list of arguments to a dictionary."""
result = {}
for i in range(0, len(args), 2):
key = args[i].lstrip("-")
value = args[i + 1]
result[key] = value
return result


def pipeline(known_args: argparse.Namespace, pipeline_args: t.List[str]) -> None:
all_uris = list(pattern_to_uris(known_args.uris, known_args.zarr))
if not all_uris:
Expand Down Expand Up @@ -75,7 +85,11 @@ def pipeline(known_args: argparse.Namespace, pipeline_args: t.List[str]) -> None
elif known_args.subcommand == 'regrid' or known_args.subcommand == 'rg':
paths | "Regrid" >> Regrid.from_kwargs(**vars(known_args))
elif known_args.subcommand == 'earthengine' or known_args.subcommand == 'ee':
paths | "MoveToEarthEngine" >> ToEarthEngine.from_kwargs(**vars(known_args))
# all_args will contain all the arguments passed to the pipeline.
all_args = {}
all_args.update(arguments_to_dict(pipeline_args))
all_args.update(**vars(known_args))
paths | "MoveToEarthEngine" >> ToEarthEngine.from_kwargs(**all_args)
else:
raise ValueError('invalid subcommand!')

Expand Down

0 comments on commit 57bd60d

Please sign in to comment.