Skip to content

Commit

Permalink
updated insert station props to use new DB
Browse files Browse the repository at this point in the history
  • Loading branch information
lstillwe committed Oct 31, 2023
1 parent 8a9003e commit 6030562
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 5 deletions.
153 changes: 153 additions & 0 deletions apsviz_gauges_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import os, sys
import psycopg2
import csv

from common.logging import LoggingUtil
from urllib.parse import urlparse

class APSVIZ_GAUGES_DB:

# dbname looks like this: 'asgs_dashboard'
# instance_id looks like this: '2744-2021050618-namforecast'
def __init__(self, logger, instance_id):
self.conn = None
self.logger = logger

self.user = os.getenv('APSVIZ_GAUGES_DB_USERNAME', 'user').strip()
self.pswd = os.getenv('APSVIZ_GAUGES_DB_PASSWORD', 'password').strip()
self.host = os.getenv('APSVIZ_GAUGES_DB_HOST', 'host').strip()
self.port = os.getenv('APSVIZ_GAUGES_DB_PORT', '5432').strip()
self.db_name = os.getenv('APSVIZ_GAUGES_DB_DATABASE', '5432').strip()

# save whole Id
self.instanceId = instance_id
self.uid = instance_id
self.instance = instance_id

# also save separate parts i.e. '2744' and '2021050618-namforecast'
parts = instance_id.split("-", 1)
if (len(parts) > 1):
self.instance = parts[0]
self.uid = parts[1]

try:
# connect to asgs database
conn_str = f'host={self.host} port={self.port} dbname={self.db_name} user={self.user} password={self.pswd}'

self.conn = psycopg2.connect(conn_str)
self.conn.set_session(autocommit=True)
self.cursor = self.conn.cursor()
except:
e = sys.exc_info()[0]
self.logger.error(f"FAILURE - Cannot connect to APSVIZ-GAUGES DB. error {e}")

def __del__(self):
"""
close up the DB
:return:
"""
try:
if self.cursor is not None:
self.cursor.close()
if self.conn is not None:
self.conn.close()
except Exception as e:
self.logger.error(f'Error detected closing cursor or connection. {e}')
#sys.exc_info()[0]

def get_user(self):
return self.user

def get_password(self):
return self.pswd

def get_host(self):
return self.host

def get_port(self):
return self.port

def get_dbname(self):
return self.db_name

# find the stationProps.csv file and insert the contents
# into the adcirc_obs db of the ASGS postgres instance
def insert_station_props(self, logger, geo, worksp, csv_file_path, geoserver_host):

# where to find the stationProps.csv file
logger.info(f"Saving {csv_file_path} to DB")
logger.debug(f"DB name is: {self.get_dbname()}")

# get the image server host name
host = os.environ.get('FILESERVER_HOST_URL', 'none').strip()
# need to remove the .edc from the geoserver_host for now - 7/18/22 - this no longer apllies for k8s runs
#if (host == 'none'):
#host = geoserver_host.replace('.edc', '')

# open the stationProps.csv file and save in db
# must create the_geom from lat, lon provided in csv file
# also add to instance id column
# and finally, create an url where the obs chart for each station can be accessed
#try: catch this exception in calling program instead
# header of stationProps.csv looks like this:
# StationId,StationName,Source,State,Lat,Lon,Node,Filename,Type
with open(csv_file_path, 'r') as f:
reader = csv.reader(f)
header = next(reader) # Skip the header row.
for index, row in enumerate(reader):
try:
# check the row. columns that have missing data are returned
no_cols_data_msg: str = self.valid_csv_row(header, row, [5])

# if there was missing data log it
if no_cols_data_msg:
# log the failed columns
logger.error("Row %s had missing column data. Columns:", index+2, no_cols_data_msg)

# no need to process this row
continue

logger.debug(f"opened csv file - saving this row to db: {row}")
filename = os.path.basename(row[7])
png_url = f"{host}/obs_pngs/{self.instanceId}/{filename}"
filename_list = os.path.splitext(filename)
json_url = f"{host}/obs_pngs/{self.instanceId}/{filename_list[0]}.json"
csv_url = f"{host}/obs_pngs/{self.instanceId}/{filename_list[0]}.csv"
sql_stmt = "INSERT INTO stations (stationid, stationname, source, state, lat, lon, node, filename, the_geom, instance_id, imageurl, type, jsonurl, csvurl) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, ST_SetSRID(ST_MakePoint(%s, %s),4326), %s, %s, %s, %s, %s)"
params = [row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[5], row[4], self.instanceId, png_url, row[8], json_url, csv_url]
logger.debug(f"sql_stmt: {sql_stmt} params: {params}")
self.cursor.execute(sql_stmt, params)
except (Exception):
self.conn.commit()
raise IOError

self.conn.commit()

@staticmethod
def valid_csv_row(header: list, row: list, optional=None) -> str:
"""
Checks the data list to make sure there are values in each required element
and log the missing data entry.
:param header: The CSV data header
:param row: The list of data
:param optional: The list of indexes that are optional
:return: A comma delimited string of the errant columns
"""
# init the return
no_data_col: list = []

# if there are no optional values passed in just create an empty list
if optional is None:
optional = []

# for each element in the row
for index, value in enumerate(row):
# is this a required value and doesn't have data
if index not in optional and (value is None or len(value) == 0):
# append the column to the list
no_data_col.append(header[index])

# return the failed cols
return ','.join(no_data_col)
1 change: 0 additions & 1 deletion asgs_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ def __init__(self, logger, dbname, instance_id):
self.pswd = os.getenv('ASGS_DB_PASSWORD', 'password').strip()
self.host = os.getenv('ASGS_DB_HOST', 'host').strip()
self.port = os.getenv('ASGS_DB_PORT', '5432').strip()
# self.db_name = os.getenv('ASGS_DB_DATABASE', 'asgs').strip()
self.db_name = dbname

# save whole Id
Expand Down
7 changes: 3 additions & 4 deletions load-geoserver-images.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#from terria_catalogV8 import TerriaCatalog
from terria_catalogV8DB import TerriaCatalogDB
from asgs_db import ASGS_DB
from apsviz_gauges_db import APSVIZ_GAUGES_DB
from zipfile import ZipFile
from general_utils import GeneralUtils

Expand Down Expand Up @@ -336,8 +337,6 @@ def add_props_datastore(logger, geo, instance_id, worksp, final_path, geoserver_
stations_filename = "stationProps.csv"
csv_file_path = f"{final_path}/insets/{stations_filename}"
store_name = str(instance_id) + "_station_props"
dbname = "adcirc_obs"
table_name = "stations"
style_name = "observations_style_v3"

logger.debug(f"csv_file_path: {csv_file_path} store name: {store_name}")
Expand All @@ -346,13 +345,13 @@ def add_props_datastore(logger, geo, instance_id, worksp, final_path, geoserver_
# check to see if stationProps.csv file exists, if so, create jndi feature layer
if os.path.isfile(csv_file_path):
# get asgs db connection
asgs_obsdb = ASGS_DB(logger, dbname, instance_id)
asgs_obsdb = APSVIZ_GAUGES_DB(logger, instance_id)
# save stationProps file to db
try: # make sure this completes before moving on - observations may not exist for this grid
asgs_obsdb.insert_station_props(logger, geo, worksp, csv_file_path, geoserver_host)
except (IOError, OSError):
e = sys.exc_info()[0]
logger.warning(f"WARNING - Cannot save station data in {dbname} DB. Error: {e}")
logger.warning(f"WARNING - Cannot save station data in APSVIZ_GAUGES DB. Error: {e}")
# TODO: Should it be returning here? return layergrp

# ... using pre-defined postgresql JNDI feature store in Geoserver
Expand Down

0 comments on commit 6030562

Please sign in to comment.