Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement pyway to mange MPI migrations #824

Merged
merged 27 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
826b850
release v1.0.4
DanPaseltiner May 17, 2023
cab7781
merge
DanPaseltiner May 18, 2023
b4fa1f4
Merge branch 'main' of https:/CDCgov/phdi into main
DanPaseltiner May 24, 2023
26400e4
Merge branch 'main' of https:/CDCgov/phdi into main
DanPaseltiner May 26, 2023
091d8bf
Merge branch 'main' of https:/CDCgov/phdi into main
DanPaseltiner Jun 27, 2023
2a17700
Merge branch 'main' of https:/CDCgov/phdi into main
DanPaseltiner Jul 20, 2023
21bbe1c
Merge branch 'main' of https:/CDCgov/phdi into main
DanPaseltiner Jul 26, 2023
6b4b7f3
Merge branch 'main' of https:/CDCgov/phdi into main
DanPaseltiner Aug 9, 2023
05b0681
Merge branch 'main' of https:/CDCgov/phdi into main
DanPaseltiner Aug 14, 2023
0a74978
Merge branch 'main' of https:/CDCgov/phdi into main
DanPaseltiner Aug 24, 2023
2bc93d8
Merge branch 'main' of https:/CDCgov/phdi into main
DanPaseltiner Sep 6, 2023
239e54f
Add function to run pyway including unit test
DanPaseltiner Sep 7, 2023
769a5cc
Add pyway and restructure migrations file names to match required nam…
DanPaseltiner Sep 7, 2023
ce01e06
Validate and migrate MPI on start up with logging.
DanPaseltiner Sep 8, 2023
b5ae9ee
Additional comments on pyway behavior
DanPaseltiner Sep 8, 2023
3d4f4e7
Update description.md
DanPaseltiner Sep 8, 2023
589d5cc
Update test_run_pyway()
DanPaseltiner Sep 8, 2023
0c04e35
Test regular failure mode run_pyway()
DanPaseltiner Sep 8, 2023
8aa461e
Test special case of pyway validate failure when no migrations have b…
DanPaseltiner Sep 8, 2023
d1c0194
Test run_migrations() happy path.
DanPaseltiner Sep 9, 2023
47b16b6
Test run_migrations() failure modes.
DanPaseltiner Sep 9, 2023
785fba5
black
DanPaseltiner Sep 9, 2023
2088d33
remove unused util function
DanPaseltiner Sep 9, 2023
32ddc81
Merge branch 'main' of https:/CDCgov/phdi into main
DanPaseltiner Sep 9, 2023
7c2fd07
Merge branch 'main' into implement-pyway
DanPaseltiner Sep 9, 2023
5c5352a
Updates to get unit tests working with additional Pyway table.
DanPaseltiner Sep 11, 2023
5685ea3
Merge branch 'main' into implement-pyway
DanPaseltiner Sep 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion containers/record-linkage/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ COPY ./migrations /code/migrations
COPY ./assets /code/assets

EXPOSE 8080
CMD uvicorn app.main:app --host 0.0.0.0 --port 8080
CMD uvicorn app.main:app --host 0.0.0.0 --port 8080 --log-config app/log_config.yml
34 changes: 34 additions & 0 deletions containers/record-linkage/app/log_config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
version: 1
disable_existing_loggers: False
formatters:
default:
# "()": uvicorn.logging.DefaultFormatter
format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
access:
# "()": uvicorn.logging.AccessFormatter
format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
handlers:
default:
formatter: default
class: logging.StreamHandler
stream: ext://sys.stderr
access:
formatter: access
class: logging.StreamHandler
stream: ext://sys.stdout
loggers:
uvicorn.error:
level: INFO
handlers:
- default
propagate: no
uvicorn.access:
level: INFO
handlers:
- access
propagate: no
root:
level: DEBUG
handlers:
- default
propagate: no
58 changes: 3 additions & 55 deletions containers/record-linkage/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,66 +10,14 @@
DIBBS_ENHANCED,
)
from pydantic import BaseModel, Field
from psycopg2 import OperationalError, errors
from typing import Optional
from app.utils import (
connect_to_mpi_with_env_vars,
load_mpi_env_vars_os,
read_json_from_assets,
run_migrations,
)
import psycopg2
import sys


# https://kb.objectrocket.com/postgresql/python-error-handling-with-the-psycopg2-postgresql-adapter-645
def print_psycopg2_exception(err):
# get details about the exception
err_type, _, traceback = sys.exc_info()

# get the line number when exception occured
line_num = traceback.tb_lineno

# print the connect() error
print("\npsycopg2 ERROR:", err, "on line number:", line_num)
print("psycopg2 traceback:", traceback, "-- type:", err_type)

# psycopg2 extensions.Diagnostics object attribute
print("\nextensions.Diagnostics:", err.diag)

# print the pgcode and pgerror exceptions
print("pgerror:", err.pgerror)
print("pgcode:", err.pgcode, "\n")


def run_migrations():
dbname, user, password, host = load_mpi_env_vars_os()
try:
connection = psycopg2.connect(
dbname=dbname,
user=user,
password=password,
host=host,
)
except OperationalError as err:
# pass exception to function
print_psycopg2_exception(err)

# set the connection to 'None' in case of error
connection = None
if connection is not None:
try:
with connection.cursor() as cursor:
cursor.execute(
open(
Path(__file__).parent.parent / "migrations" / "tables.ddl", "r"
).read()
)
except errors.InFailedSqlTransaction as err:
# pass exception to function
print_psycopg2_exception(err)


# Run MPI migrations on spin up
# Ensure MPI is configured as expected.
run_migrations()

# Instantiate FastAPI via PHDI's BaseService class
Expand All @@ -80,7 +28,7 @@ def run_migrations():
).start()


# Request and and response models
# Request and response models
class LinkRecordInput(BaseModel):
"""
Schema for requests to the /link-record endpoint.
Expand Down
90 changes: 90 additions & 0 deletions containers/record-linkage/app/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
import pathlib
from app.config import get_settings
from phdi.linkage import DIBBsConnectorClient
import subprocess
from typing import Literal
import logging


def connect_to_mpi_with_env_vars():
Expand Down Expand Up @@ -35,3 +38,90 @@ def load_mpi_env_vars_os():

def read_json_from_assets(filename: str):
return json.load(open((pathlib.Path(__file__).parent.parent / "assets" / filename)))


def run_pyway(
pyway_command: Literal["info", "validate", "migrate", "import"]
) -> subprocess.CompletedProcess:
"""
Helper function to run the pyway CLI from Python.

:param pyway_command: The specific pyway command to run.
:return: A subprocess.CompletedProcess object containing the results of the pyway
command.
"""

logger = logging.getLogger(__name__)

# Prepare the pyway command.
migrations_dir = str(pathlib.Path(__file__).parent.parent / "migrations")
settings = get_settings()
pyway_args = [
f"--database-migration-dir {migrations_dir}",
f"--database-type {settings['mpi_db_type']}",
f"--database-host {settings['mpi_host']}",
f"--database-port {settings['mpi_port']}",
f"--database-name {settings['mpi_dbname']}",
f"--database-username {settings['mpi_user']}",
f"--database-password {settings['mpi_password']}",
]

full_command = ["pyway", pyway_command] + pyway_args
full_command = " ".join(full_command)

# Attempt to run the pyway command.
try:
pyway_response = subprocess.run(
full_command, shell=True, check=True, capture_output=True
)
except subprocess.CalledProcessError as error:
error_message = error.output.decode("utf-8")

# Pyway validate returns an error if no migrations have been applied yet.
# This is expected behavior, so we can ignore this error and continue onto
# the migrations with pyway migrate. We'll encounter this error when we
# first deploy the service with a fresh database.
if (
"ERROR: no migrations applied yet, no validation necessary."
in error_message
):
logger.warning(error_message)
return subprocess.CompletedProcess(
args=full_command,
returncode=0,
stdout=None,
stderr=error_message,
)
else:
logger.error(error_message)
raise error

logger.info(pyway_response.stdout.decode("utf-8"))

return pyway_response


def run_migrations():
"""
Use the pyway CLI to ensure that the MPI database is up to date with the latest
migrations.
"""
logger = logging.getLogger(__name__)
logger.info("Validating MPI database schema...")
validation_response = run_pyway("validate")

if validation_response.returncode == 0:
logger.info("MPI database schema validations successful.")

logger.info("Migrating MPI database...")
migrations_response = run_pyway("migrate")

if migrations_response.returncode == 0:
logger.info("MPI database migrations successful.")
else:
logger.error("MPI database migrations failed.")
raise Exception(migrations_response.stderr.decode("utf-8"))

else:
logger.error("MPI database schema validations failed.")
raise Exception(validation_response.stderr.decode("utf-8"))
2 changes: 1 addition & 1 deletion containers/record-linkage/description.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ We recommend running the record linkage service from a container, but if that is
4. Make a fresh virtual environment with `python -m venv .venv`.
5. Activate the virtual environment with `source .venv/bin/activate` (MacOS and Linux), `venv\Scripts\activate` (Windows Command Prompt), or `.venv\Scripts\Activate.ps1` (Windows PowerShell).
5. Install all Python dependencies for the record linkage service with `pip install -r requirements.txt` into your virtual environment.
6. Run the FHIR Converter on `localhost:8080` with `python -m uvicorn app.main:app --host 0.0.0.0 --port 8080`.
6. Run the record linkage service on `localhost:8080` with `python -m uvicorn app.main:app --host 0.0.0.0 --port 8080 --log-config app/log_config.yml`.

### Building the Docker Image

Expand Down
3 changes: 3 additions & 0 deletions containers/record-linkage/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@ fastapi==0.96.0
phdi @ git+https:/CDCgov/phdi@main
httpx
pathlib
pyway
psycopg2-binary==2.9.7
tabulate
8 changes: 5 additions & 3 deletions containers/record-linkage/tests/test_record_linkage.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ def clean_up_db():
dbconn.commit()
cursor.execute("DROP TABLE IF EXISTS person")
dbconn.commit()
cursor.execute("DROP TABLE IF EXISTS pyway")
dbconn.commit()
cursor.close()
dbconn.close()

Expand Down Expand Up @@ -113,9 +115,9 @@ def test_linkage_invalid_db_type():

def test_linkage_success():
# Clear MPI ahead of testing
clean_up_db()
run_migrations()
test_bundle = load_test_bundle()
# clean_up_db()
# run_migrations()
# test_bundle = load_test_bundle()

set_mpi_env_vars()
clean_up_db()
Expand Down
Loading
Loading