Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingested grid point as a polygon in BigQuery. #337

Merged
merged 21 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion weather_mv/loader_pipeline/bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
DATA_URI_COLUMN = 'data_uri'
DATA_FIRST_STEP = 'data_first_step'
GEO_POINT_COLUMN = 'geo_point'
GEO_POLYGON_COLUMN = 'geo_polygon'
LATITUDE_RANGE = (-90, 90)


Expand Down Expand Up @@ -95,6 +96,9 @@ class ToBigQuery(ToDataSink):
skip_region_validation: bool
disable_grib_schema_normalization: bool
coordinate_chunk_size: int = 10_000
should_create_polygon: bool = False
lat_grid_resolution: t.Optional[float] = None
lon_grid_resolution: t.Optional[float] = None

@classmethod
def add_parser_arguments(cls, subparser: argparse.ArgumentParser):
dabhicusp marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -155,6 +159,20 @@ def __post_init__(self):
with open_dataset(self.first_uri, self.xarray_open_dataset_kwargs,
self.disable_grib_schema_normalization, self.tif_metadata_for_datetime,
is_zarr=self.zarr) as open_ds:

# Find the grid_resolution. In case of single point we can't find grid resolution.
if open_ds['latitude'].size > 1 and open_ds['longitude'].size > 1:
# consider that Grid is regular.
dabhicusp marked this conversation as resolved.
Show resolved Hide resolved
latitude_length = len(open_ds['latitude'])
longitude_length = len(open_ds['longitude'])
self.lat_grid_resolution = (open_ds["latitude"][-1].values - open_ds["latitude"][0].values
)/latitude_length
self.lon_grid_resolution = (open_ds["longitude"][-1].values - open_ds["longitude"][0].values
)/longitude_length
self.should_create_polygon = True
else:
logger.warning("Polygon can't be genereated as dataset has a single point.")
dabhicusp marked this conversation as resolved.
Show resolved Hide resolved

# Define table from user input
if self.variables and not self.infer_schema and not open_ds.attrs['is_normalized']:
logger.info('Creating schema from input variables.')
Expand Down Expand Up @@ -232,7 +250,9 @@ def extract_rows(self, uri: str, coordinates: t.List[t.Dict]) -> t.Iterator[t.Di
row[DATA_URI_COLUMN] = uri
row[DATA_FIRST_STEP] = first_time_step
row[GEO_POINT_COLUMN] = fetch_geo_point(row['latitude'], row['longitude'])

row[GEO_POLYGON_COLUMN] = fetch_geo_polygon(row['latitude'], row['longitude'],
self.lat_grid_resolution, self.lon_grid_resolution
) if self.should_create_polygon else None
# 'row' ends up looking like:
# {'latitude': 88.0, 'longitude': 2.0, 'time': '2015-01-01 06:00:00', 'd': -2.0187, 'cc': 0.007812,
# 'z': 50049.8, 'data_import_time': '2020-12-05 00:12:02.424573 UTC', ...}
Expand Down Expand Up @@ -299,6 +319,7 @@ def to_table_schema(columns: t.List[t.Tuple[str, str]]) -> t.List[bigquery.Schem
fields.append(bigquery.SchemaField(DATA_URI_COLUMN, 'STRING', mode='NULLABLE'))
fields.append(bigquery.SchemaField(DATA_FIRST_STEP, 'TIMESTAMP', mode='NULLABLE'))
fields.append(bigquery.SchemaField(GEO_POINT_COLUMN, 'GEOGRAPHY', mode='NULLABLE'))
fields.append(bigquery.SchemaField(GEO_POLYGON_COLUMN, 'STRING', mode='NULLABLE'))

return fields

Expand All @@ -310,3 +331,24 @@ def fetch_geo_point(lat: float, long: float) -> str:
long = ((long + 180) % 360) - 180
point = geojson.dumps(geojson.Point((long, lat)))
return point


def fetch_geo_polygon(latitude: float, longitude: float, lat_grid_resolution: float, lon_grid_resolution: float) -> str:
"""Create a Polygon based on latitude, longitude and resolution."""
lower_left = [latitude - lat_grid_resolution, longitude - lon_grid_resolution]
upper_left = [latitude - lat_grid_resolution, longitude + lon_grid_resolution]
upper_right = [latitude + lat_grid_resolution, longitude + lon_grid_resolution]
lower_right = [latitude + lat_grid_resolution, longitude - lon_grid_resolution]
lat_lon_bound = [lower_left, upper_left, upper_right, lower_right]

for i in range(len(lat_lon_bound)):
if lat_lon_bound[i][1] >= 180:
lat_lon_bound[i][1] = lat_lon_bound[i][1] - 360
polygon = geojson.dumps(geojson.Polygon([
(lat_lon_bound[0][0], lat_lon_bound[0][1]), # lower_left
(lat_lon_bound[1][0], lat_lon_bound[1][1]), # upper_left
(lat_lon_bound[2][0], lat_lon_bound[2][1]), # upper_right
(lat_lon_bound[3][0], lat_lon_bound[3][1]), # lower_right
(lat_lon_bound[0][0], lat_lon_bound[0][1]), # lower_left
dabhicusp marked this conversation as resolved.
Show resolved Hide resolved
]))
return polygon
96 changes: 87 additions & 9 deletions weather_mv/loader_pipeline/bq_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def test_schema_generation(self):
SchemaField('data_uri', 'STRING', 'NULLABLE', None, (), None),
SchemaField('data_first_step', 'TIMESTAMP', 'NULLABLE', None, (), None),
SchemaField('geo_point', 'GEOGRAPHY', 'NULLABLE', None, (), None),
SchemaField('geo_polygon', 'STRING', 'NULLABLE', None, (), None)
]
self.assertListEqual(schema, expected_schema)

Expand All @@ -89,6 +90,7 @@ def test_schema_generation__with_schema_normalization(self):
SchemaField('data_uri', 'STRING', 'NULLABLE', None, (), None),
SchemaField('data_first_step', 'TIMESTAMP', 'NULLABLE', None, (), None),
SchemaField('geo_point', 'GEOGRAPHY', 'NULLABLE', None, (), None),
SchemaField('geo_polygon', 'STRING', 'NULLABLE', None, (), None)
]
self.assertListEqual(schema, expected_schema)

Expand All @@ -104,6 +106,7 @@ def test_schema_generation__with_target_columns(self):
SchemaField('data_uri', 'STRING', 'NULLABLE', None, (), None),
SchemaField('data_first_step', 'TIMESTAMP', 'NULLABLE', None, (), None),
SchemaField('geo_point', 'GEOGRAPHY', 'NULLABLE', None, (), None),
SchemaField('geo_polygon', 'STRING', 'NULLABLE', None, (), None)
]
self.assertListEqual(schema, expected_schema)

Expand All @@ -119,6 +122,7 @@ def test_schema_generation__with_target_columns__with_schema_normalization(self)
SchemaField('data_uri', 'STRING', 'NULLABLE', None, (), None),
SchemaField('data_first_step', 'TIMESTAMP', 'NULLABLE', None, (), None),
SchemaField('geo_point', 'GEOGRAPHY', 'NULLABLE', None, (), None),
SchemaField('geo_polygon', 'STRING', 'NULLABLE', None, (), None)
]
self.assertListEqual(schema, expected_schema)

Expand All @@ -135,6 +139,7 @@ def test_schema_generation__no_targets_specified(self):
SchemaField('data_uri', 'STRING', 'NULLABLE', None, (), None),
SchemaField('data_first_step', 'TIMESTAMP', 'NULLABLE', None, (), None),
SchemaField('geo_point', 'GEOGRAPHY', 'NULLABLE', None, (), None),
SchemaField('geo_polygon', 'STRING', 'NULLABLE', None, (), None)
]
self.assertListEqual(schema, expected_schema)

Expand All @@ -151,6 +156,7 @@ def test_schema_generation__no_targets_specified__with_schema_normalization(self
SchemaField('data_uri', 'STRING', 'NULLABLE', None, (), None),
SchemaField('data_first_step', 'TIMESTAMP', 'NULLABLE', None, (), None),
SchemaField('geo_point', 'GEOGRAPHY', 'NULLABLE', None, (), None),
SchemaField('geo_polygon', 'STRING', 'NULLABLE', None, (), None)
]
self.assertListEqual(schema, expected_schema)

Expand Down Expand Up @@ -184,6 +190,7 @@ def test_schema_generation__non_index_coords(self):
SchemaField('data_uri', 'STRING', 'NULLABLE', None, (), None),
SchemaField('data_first_step', 'TIMESTAMP', 'NULLABLE', None, (), None),
SchemaField('geo_point', 'GEOGRAPHY', 'NULLABLE', None, (), None),
SchemaField('geo_polygon', 'STRING', 'NULLABLE', None, (), None)

]
self.assertListEqual(schema, expected_schema)
Expand All @@ -197,10 +204,12 @@ def extract(self, data_path, *, variables=None, area=None, open_dataset_kwargs=N
if zarr_kwargs is None:
zarr_kwargs = {}
op = ToBigQuery.from_kwargs(first_uri=data_path, dry_run=True, zarr=zarr, zarr_kwargs=zarr_kwargs,
output_table='foo.bar.baz', variables=variables, area=area,
xarray_open_dataset_kwargs=open_dataset_kwargs, import_time=import_time, infer_schema=False,
tif_metadata_for_datetime=tif_metadata_for_datetime, skip_region_validation=True,
disable_grib_schema_normalization=disable_grib_schema_normalization, coordinate_chunk_size=1000)
output_table='foo.bar.baz', variables=variables, area=area,
xarray_open_dataset_kwargs=open_dataset_kwargs, import_time=import_time,
infer_schema=False, tif_metadata_for_datetime=tif_metadata_for_datetime,
skip_region_validation=True,
disable_grib_schema_normalization=disable_grib_schema_normalization,
coordinate_chunk_size=1000)
coords = op.prepare_coordinates(data_path)
for uri, chunk in coords:
yield from op.extract_rows(uri, chunk)
Expand Down Expand Up @@ -245,6 +254,10 @@ def test_extract_rows(self):
'u10': 3.4776244163513184,
'v10': 0.03294110298156738,
'geo_point': geojson.dumps(geojson.Point((-108.0, 49.0))),
'geo_polygon': geojson.dumps(geojson.Polygon([
(49.198347, -108.197674), (49.198347, -107.802326),
(48.801653, -107.802326), (48.801653, -108.197674),
(49.198347, -108.197674)]))
}
self.assertRowsEqual(actual, expected)

Expand All @@ -259,6 +272,10 @@ def test_extract_rows__with_subset_variables(self):
'time': '2018-01-02T06:00:00+00:00',
'u10': 3.4776244163513184,
'geo_point': geojson.dumps(geojson.Point((-108.0, 49.0))),
'geo_polygon': geojson.dumps(geojson.Polygon([
(49.198347, -108.197674), (49.198347, -107.802326),
(48.801653, -107.802326), (48.801653, -108.197674),
(49.198347, -108.197674)]))
}
self.assertRowsEqual(actual, expected)

Expand All @@ -275,6 +292,10 @@ def test_extract_rows__specific_area(self):
'u10': 2.73445987701416,
'v10': 0.08277571201324463,
'geo_point': geojson.dumps(geojson.Point((-103.0, 45.0))),
'geo_polygon': geojson.dumps(geojson.Polygon([
(45.198347, -103.197674), (45.198347, -102.802326),
(44.801653, -102.802326), (44.801653, -103.197674),
(45.198347, -103.197674)]))
}
self.assertRowsEqual(actual, expected)

Expand All @@ -291,6 +312,10 @@ def test_extract_rows__specific_area_float_points(self):
'u10': 3.94743275642395,
'v10': -0.19749987125396729,
'geo_point': geojson.dumps(geojson.Point((-103.400002, 45.200001))),
'geo_polygon': geojson.dumps(geojson.Polygon([
(45.398348, -103.597676), (45.398348, -103.202327),
(45.001654, -103.202327), (45.001654, -103.597676),
(45.398348, -103.597676)]))
}
self.assertRowsEqual(actual, expected)

Expand All @@ -307,7 +332,11 @@ def test_extract_rows__specify_import_time(self):
'time': '2018-01-02T06:00:00+00:00',
'u10': 3.4776244163513184,
'v10': 0.03294110298156738,
'geo_point': geojson.dumps(geojson.Point((-108.0, 49.0)))
'geo_point': geojson.dumps(geojson.Point((-108.0, 49.0))),
'geo_polygon': geojson.dumps(geojson.Polygon([
(49.198347, -108.197674), (49.198347, -107.802326),
(48.801653, -107.802326), (48.801653, -108.197674),
(49.198347, -108.197674)]))
}
self.assertRowsEqual(actual, expected)

Expand All @@ -324,7 +353,8 @@ def test_extract_rows_single_point(self):
'time': '2018-01-02T06:00:00+00:00',
'u10': 3.4776244163513184,
'v10': 0.03294110298156738,
'geo_point': geojson.dumps(geojson.Point((-108.0, 49.0)))
'geo_point': geojson.dumps(geojson.Point((-108.0, 49.0))),
'geo_polygon': None
}
self.assertRowsEqual(actual, expected)

Expand All @@ -342,6 +372,10 @@ def test_extract_rows_nan(self):
'u10': None,
'v10': 0.03294110298156738,
'geo_point': geojson.dumps(geojson.Point((-108.0, 49.0))),
'geo_polygon': geojson.dumps(geojson.Polygon([
(49.198347, -108.197674), (49.198347, -107.802326),
(48.801653, -107.802326), (48.801653, -108.197674),
(49.198347, -108.197674)]))
}
self.assertRowsEqual(actual, expected)

Expand Down Expand Up @@ -384,12 +418,16 @@ def test_extract_rows_zarr(self):
'longitude': 0,
'time': '1959-01-01T00:00:00+00:00',
'geo_point': geojson.dumps(geojson.Point((0.0, 90.0))),
'geo_polygon': geojson.dumps(geojson.Polygon([
(90.249653, -0.249826), (90.249653, 0.249826),
(89.750347, 0.249826), (89.750347, -0.249826),
(90.249653, -0.249826)]))
}
self.assertRowsEqual(actual, expected)

def test_droping_variable_while_opening_zarr(self):
input_path = os.path.join(self.test_data_folder, 'test_data.zarr')
actual = next(self.extract(input_path, zarr=True, zarr_kwargs={ 'drop_variables': ['cape'] }))
actual = next(self.extract(input_path, zarr=True, zarr_kwargs={'drop_variables': ['cape']}))
expected = {
'd2m': 237.5404052734375,
'data_import_time': '1970-01-01T00:00:00+00:00',
Expand All @@ -399,6 +437,10 @@ def test_droping_variable_while_opening_zarr(self):
'longitude': 0,
'time': '1959-01-01T00:00:00+00:00',
'geo_point': geojson.dumps(geojson.Point((0.0, 90.0))),
'geo_polygon': geojson.dumps(geojson.Polygon([
(90.249653, -0.249826), (90.249653, 0.249826),
(89.750347, 0.249826), (89.750347, -0.249826),
(90.249653, -0.249826)]))
}
self.assertRowsEqual(actual, expected)

Expand All @@ -420,7 +462,11 @@ def test_extract_rows(self):
'latitude': 42.09783344918844,
'longitude': -123.66686981141397,
'time': '2020-07-01T00:00:00+00:00',
'geo_point': geojson.dumps(geojson.Point((-123.66687, 42.097833)))
'geo_point': geojson.dumps(geojson.Point((-123.66687, 42.097833))),
'geo_polygon': geojson.dumps(geojson.Polygon([
(42.102297, -123.672837), (42.102297, -123.660901),
(42.093375, -123.660901), (42.093375, -123.672837),
(42.102297, -123.672837)]))
}
self.assertRowsEqual(actual, expected)

Expand All @@ -447,6 +493,10 @@ def test_extract_rows(self):
'valid_time': '2021-10-18T06:00:00+00:00',
'z': 1.42578125,
'geo_point': geojson.dumps(geojson.Point((-180.0, 90.0))),
'geo_polygon': geojson.dumps(geojson.Polygon([
(90.099944, -180.099972), (90.099944, -179.900028),
(89.900056, -179.900028), (89.900056, -180.099972),
(90.099944, -180.099972)]))
}
self.assertRowsEqual(actual, expected)

Expand All @@ -461,6 +511,10 @@ def test_extract_rows__with_vars__excludes_non_index_coords__without_schema_norm
'longitude': -180.0,
'z': 1.42578125,
'geo_point': geojson.dumps(geojson.Point((-180.0, 90.0))),
'geo_polygon': geojson.dumps(geojson.Polygon([
(90.099944, -180.099972), (90.099944, -179.900028),
(89.900056, -179.900028), (89.900056, -180.099972),
(90.099944, -180.099972)]))
}
self.assertRowsEqual(actual, expected)

Expand All @@ -477,6 +531,10 @@ def test_extract_rows__with_vars__includes_coordinates_in_vars__without_schema_n
'step': 0,
'z': 1.42578125,
'geo_point': geojson.dumps(geojson.Point((-180.0, 90.0))),
'geo_polygon': geojson.dumps(geojson.Polygon([
(90.099944, -180.099972), (90.099944, -179.900028),
(89.900056, -179.900028), (89.900056, -180.099972),
(90.099944, -180.099972)]))
}
self.assertRowsEqual(actual, expected)

Expand All @@ -491,6 +549,10 @@ def test_extract_rows__with_vars__excludes_non_index_coords__with_schema_normali
'longitude': -180.0,
'surface_0_00_instant_z': 1.42578125,
'geo_point': geojson.dumps(geojson.Point((-180.0, 90.0))),
'geo_polygon': geojson.dumps(geojson.Polygon([
(90.099944, -180.099972), (90.099944, -179.900028),
(89.900056, -179.900028), (89.900056, -180.099972),
(90.099944, -180.099972)]))
}
self.assertRowsEqual(actual, expected)

Expand All @@ -506,6 +568,10 @@ def test_extract_rows__with_vars__includes_coordinates_in_vars__with_schema_norm
'step': 0,
'surface_0_00_instant_z': 1.42578125,
'geo_point': geojson.dumps(geojson.Point((-180.0, 90.0))),
'geo_polygon': geojson.dumps(geojson.Polygon([
(90.099944, -180.099972), (90.099944, -179.900028),
(89.900056, -179.900028), (89.900056, -180.099972),
(90.099944, -180.099972)]))
}
self.assertRowsEqual(actual, expected)

Expand Down Expand Up @@ -559,6 +625,10 @@ def test_multiple_editions__without_schema_normalization(self):
'v200': -3.6647186279296875,
'valid_time': '2021-12-10T20:00:00+00:00',
'geo_point': geojson.dumps(geojson.Point((-180.0, 90.0))),
'geo_polygon': geojson.dumps(geojson.Polygon([
(90.099944, -180.099972), (90.099944, -179.900028),
(89.900056, -179.900028), (89.900056, -180.099972),
(90.099944, -180.099972)]))
}
self.assertRowsEqual(actual, expected)

Expand Down Expand Up @@ -614,7 +684,11 @@ def test_multiple_editions__with_schema_normalization(self):
'surface_0_00_instant_tprate': 0.0,
'surface_0_00_instant_ceil': 179.17018127441406,
'valid_time': '2021-12-10T20:00:00+00:00',
'geo_point': geojson.dumps(geojson.Point((-180.0, 90.0)))
'geo_point': geojson.dumps(geojson.Point((-180.0, 90.0))),
'geo_polygon': geojson.dumps(geojson.Polygon([
(90.099944, -180.099972), (90.099944, -179.900028),
(89.900056, -179.900028), (89.900056, -180.099972),
(90.099944, -180.099972)]))
}
self.assertRowsEqual(actual, expected)

Expand All @@ -634,6 +708,10 @@ def test_multiple_editions__with_vars__includes_coordinates_in_vars__with_schema
'depthBelowLandLayer_0_00_instant_stl1': 251.02520751953125,
'depthBelowLandLayer_7_00_instant_stl2': 253.54124450683594,
'geo_point': geojson.dumps(geojson.Point((-180.0, 90.0))),
'geo_polygon': geojson.dumps(geojson.Polygon([
(90.099944, -180.099972), (90.099944, -179.900028),
(89.900056, -179.900028), (89.900056, -180.099972),
(90.099944, -180.099972)]))
}
self.assertRowsEqual(actual, expected)

Expand Down