Skip to content

Commit

Permalink
pull observations from the new apsviz gauges db
Browse files Browse the repository at this point in the history
  • Loading branch information
lstillwe committed Oct 31, 2023
1 parent 6030562 commit 3ed31fe
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 250 deletions.
56 changes: 8 additions & 48 deletions apsviz_db.py
Original file line number Diff line number Diff line change
@@ -1,64 +1,24 @@
import os, sys
import logging
import base_db
import psycopg2
import json

from common.logging import LoggingUtil
from urllib.parse import urlparse

class APSVIZ_DB:
class APSVIZ_DB(base_db):

# dbname looks like this: 'asgs_dashboard'
# instance_id looks like this: '2744-2021050618-namforecast'
def __init__(self, logger):
self.conn = None
self.logger = logger

self.user = os.getenv('APSVIZ_DB_USERNAME', 'user').strip()
self.pswd = os.getenv('APSVIZ_DB_PASSWORD', 'password').strip()
self.db_name = os.getenv('APSVIZ_DB_DATABASE', 'database').strip()
self.host = os.getenv('ASGS_DB_HOST', 'host').strip()
self.port = os.getenv('ASGS_DB_PORT', '5432').strip()
user = os.getenv('APSVIZ_DB_USERNAME', 'user').strip()
pswd = os.getenv('APSVIZ_DB_PASSWORD', 'password').strip()
db_name = os.getenv('APSVIZ_DB_DATABASE', 'database').strip()
host = os.getenv('APSVIZ_DB_HOST', 'host').strip()
port = os.getenv('APSVIZ_DB_PORT', '5432').strip()

try:
# connect to asgs database
conn_str = f'host={self.host} port={self.port} dbname={self.db_name} user={self.user} password={self.pswd}'

self.conn = psycopg2.connect(conn_str)
self.conn.set_session(autocommit=True)
self.cursor = self.conn.cursor()
except:
e = sys.exc_info()[0]
self.logger.error(f"FAILURE - Cannot connect to APSVIZ DB. error {e}")

def __del__(self):
"""
close up the DB
:return:
"""
try:
if self.cursor is not None:
self.cursor.close()
if self.conn is not None:
self.conn.close()
except Exception as e:
self.logger.error(f'Error detected closing cursor or connection. {e}')
#sys.exc_info()[0]

def get_user(self):
return self.user

def get_password(self):
return self.pswd

def get_host(self):
return self.host

def get_port(self):
return self.port

def get_dbname(self):
return self.db_name
super().__init__(logger, user, pswd, db_name, host, port)

def find_cat_group(self, date_str):
exists = False
Expand Down
82 changes: 30 additions & 52 deletions apsviz_gauges_db.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
import os, sys
import psycopg2
import base_db
import csv

from common.logging import LoggingUtil
from urllib.parse import urlparse
class APSVIZ_GAUGES_DB(base_db):

class APSVIZ_GAUGES_DB:

# dbname looks like this: 'asgs_dashboard'
# dbname looks like this: 'apsviz_gauges'
# instance_id looks like this: '2744-2021050618-namforecast'
def __init__(self, logger, instance_id):
self.conn = None
self.logger = logger

self.user = os.getenv('APSVIZ_GAUGES_DB_USERNAME', 'user').strip()
self.pswd = os.getenv('APSVIZ_GAUGES_DB_PASSWORD', 'password').strip()
self.host = os.getenv('APSVIZ_GAUGES_DB_HOST', 'host').strip()
self.port = os.getenv('APSVIZ_GAUGES_DB_PORT', '5432').strip()
self.db_name = os.getenv('APSVIZ_GAUGES_DB_DATABASE', '5432').strip()
user = os.getenv('APSVIZ_GAUGES_DB_USERNAME', 'user').strip()
pswd = os.getenv('APSVIZ_GAUGES_DB_PASSWORD', 'password').strip()
host = os.getenv('APSVIZ_GAUGES_DB_HOST', 'host').strip()
port = os.getenv('APSVIZ_GAUGES_DB_PORT', '5432').strip()
db_name = os.getenv('APSVIZ_GAUGES_DB_DATABASE', '5432').strip()

super().__init__(logger, user, pswd, db_name, host, port)

# save whole Id
self.instanceId = instance_id
Expand All @@ -30,45 +27,6 @@ def __init__(self, logger, instance_id):
self.instance = parts[0]
self.uid = parts[1]

try:
# connect to asgs database
conn_str = f'host={self.host} port={self.port} dbname={self.db_name} user={self.user} password={self.pswd}'

self.conn = psycopg2.connect(conn_str)
self.conn.set_session(autocommit=True)
self.cursor = self.conn.cursor()
except:
e = sys.exc_info()[0]
self.logger.error(f"FAILURE - Cannot connect to APSVIZ-GAUGES DB. error {e}")

def __del__(self):
"""
close up the DB
:return:
"""
try:
if self.cursor is not None:
self.cursor.close()
if self.conn is not None:
self.conn.close()
except Exception as e:
self.logger.error(f'Error detected closing cursor or connection. {e}')
#sys.exc_info()[0]

def get_user(self):
return self.user

def get_password(self):
return self.pswd

def get_host(self):
return self.host

def get_port(self):
return self.port

def get_dbname(self):
return self.db_name

# find the stationProps.csv file and insert the contents
# into the adcirc_obs db of the ASGS postgres instance
Expand Down Expand Up @@ -123,6 +81,26 @@ def insert_station_props(self, logger, geo, worksp, csv_file_path, geoserver_hos

self.conn.commit()

def isObsRun(self):

found = False

try:
sql_stmt = 'SELECT model_run_id FROM drf_apsviz_station WHERE model_run_id=%s'
params = [self.instance]
self.logger.debug(f"sql statement is: {sql_stmt} params are: {params}")
self.cursor.execute(sql_stmt, params)
ret = self.cursor.fetchone()
if ret:
self.logger.debug(f"value returned is: {ret}")
found = True

except:
e = sys.exc_info()[0]
self.logger.error(f"FAILURE - Cannot retrieve instance id from {self.dbname}. error {e}")
finally:
return found

@staticmethod
def valid_csv_row(header: list, row: list, optional=None) -> str:
"""
Expand Down
147 changes: 11 additions & 136 deletions asgs_db.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,20 @@
import os, sys
import logging
import psycopg2
import csv
import base_db

from common.logging import LoggingUtil
from urllib.parse import urlparse

class ASGS_DB:
class ASGS_DB(base_db):

# dbname looks like this: 'asgs_dashboard'
# instance_id looks like this: '2744-2021050618-namforecast'
def __init__(self, logger, dbname, instance_id):
self.conn = None
self.logger = logger
def __init__(self, logger, instance_id):

user = os.getenv('ASGS_DB_USERNAME', 'user').strip()
pswd = os.getenv('ASGS_DB_PASSWORD', 'password').strip()
host = os.getenv('ASGS_DB_HOST', 'host').strip()
port = os.getenv('ASGS_DB_PORT', '5432').strip()
db_name = os.getenv('ASGS_DB_DATABASE', 'db').strip()

super().__init__(logger, user, pswd, db_name, host, port)

self.user = os.getenv('ASGS_DB_USERNAME', 'user').strip()
self.pswd = os.getenv('ASGS_DB_PASSWORD', 'password').strip()
self.host = os.getenv('ASGS_DB_HOST', 'host').strip()
self.port = os.getenv('ASGS_DB_PORT', '5432').strip()
self.db_name = dbname

# save whole Id
self.instanceId = instance_id
Expand All @@ -31,46 +27,6 @@ def __init__(self, logger, dbname, instance_id):
self.instance = parts[0]
self.uid = parts[1]

try:
# connect to asgs database
conn_str = f'host={self.host} port={self.port} dbname={self.db_name} user={self.user} password={self.pswd}'

self.conn = psycopg2.connect(conn_str)
self.conn.set_session(autocommit=True)
self.cursor = self.conn.cursor()
except:
e = sys.exc_info()[0]
self.logger.error(f"FAILURE - Cannot connect to ASGS_DB. error {e}")

def __del__(self):
"""
close up the DB
:return:
"""
try:
if self.cursor is not None:
self.cursor.close()
if self.conn is not None:
self.conn.close()
except Exception as e:
self.logger.error(f'Error detected closing cursor or connection. {e}')
#sys.exc_info()[0]

def get_user(self):
return self.user

def get_password(self):
return self.pswd

def get_host(self):
return self.host

def get_port(self):
return self.port

def get_dbname(self):
return self.db_name


# given instance id - save geoserver url (to access this mbtiles layer) in the asgs database
def saveImageURL(self, name, url):
Expand Down Expand Up @@ -133,84 +89,3 @@ def getRunMetadata(self):
metadata_dict['suite.project_code'] = 'asgs'
return metadata_dict

# find the stationProps.csv file and insert the contents
# into the adcirc_obs db of the ASGS postgres instance
def insert_station_props(self, logger, geo, worksp, csv_file_path, geoserver_host):

# where to find the stationProps.csv file
logger.info(f"Saving {csv_file_path} to DB")
logger.debug(f"DB name is: {self.get_dbname()}")

# get the image server host name
host = os.environ.get('FILESERVER_HOST_URL', 'none').strip()
# need to remove the .edc from the geoserver_host for now - 7/18/22 - this no longer apllies for k8s runs
#if (host == 'none'):
#host = geoserver_host.replace('.edc', '')

# open the stationProps.csv file and save in db
# must create the_geom from lat, lon provided in csv file
# also add to instance id column
# and finally, create an url where the obs chart for each station can be accessed
#try: catch this exception in calling program instead
# header of stationProps.csv looks like this:
# StationId,StationName,Source,State,Lat,Lon,Node,Filename,Type
with open(csv_file_path, 'r') as f:
reader = csv.reader(f)
header = next(reader) # Skip the header row.
for index, row in enumerate(reader):
try:
# check the row. columns that have missing data are returned
no_cols_data_msg: str = self.valid_csv_row(header, row, [5])

# if there was missing data log it
if no_cols_data_msg:
# log the failed columns
logger.error("Row %s had missing column data. Columns:", index+2, no_cols_data_msg)

# no need to process this row
continue

logger.debug(f"opened csv file - saving this row to db: {row}")
filename = os.path.basename(row[7])
png_url = f"{host}/obs_pngs/{self.instanceId}/{filename}"
filename_list = os.path.splitext(filename)
json_url = f"{host}/obs_pngs/{self.instanceId}/{filename_list[0]}.json"
csv_url = f"{host}/obs_pngs/{self.instanceId}/{filename_list[0]}.csv"
sql_stmt = "INSERT INTO stations (stationid, stationname, source, state, lat, lon, node, filename, the_geom, instance_id, imageurl, type, jsonurl, csvurl) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, ST_SetSRID(ST_MakePoint(%s, %s),4326), %s, %s, %s, %s, %s)"
params = [row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[5], row[4], self.instanceId, png_url, row[8], json_url, csv_url]
logger.debug(f"sql_stmt: {sql_stmt} params: {params}")
self.cursor.execute(sql_stmt, params)
except (Exception):
self.conn.commit()
raise IOError

self.conn.commit()

@staticmethod
def valid_csv_row(header: list, row: list, optional=None) -> str:
"""
Checks the data list to make sure there are values in each required element
and log the missing data entry.
:param header: The CSV data header
:param row: The list of data
:param optional: The list of indexes that are optional
:return: A comma delimited string of the errant columns
"""
# init the return
no_data_col: list = []

# if there are no optional values passed in just create an empty list
if optional is None:
optional = []

# for each element in the row
for index, value in enumerate(row):
# is this a required value and doesn't have data
if index not in optional and (value is None or len(value) == 0):
# append the column to the list
no_data_col.append(header[index])

# return the failed cols
return ','.join(no_data_col)
Loading

0 comments on commit 3ed31fe

Please sign in to comment.