diff --git a/containers/record-linkage/Dockerfile b/containers/record-linkage/Dockerfile index f0e5d21709..e01a0f0431 100644 --- a/containers/record-linkage/Dockerfile +++ b/containers/record-linkage/Dockerfile @@ -15,4 +15,4 @@ COPY ./migrations /code/migrations COPY ./assets /code/assets EXPOSE 8080 -CMD uvicorn app.main:app --host 0.0.0.0 --port 8080 \ No newline at end of file +CMD uvicorn app.main:app --host 0.0.0.0 --port 8080 --log-config app/log_config.yml \ No newline at end of file diff --git a/containers/record-linkage/app/log_config.yml b/containers/record-linkage/app/log_config.yml new file mode 100644 index 0000000000..cd209d6028 --- /dev/null +++ b/containers/record-linkage/app/log_config.yml @@ -0,0 +1,34 @@ +version: 1 +disable_existing_loggers: False +formatters: + default: + # "()": uvicorn.logging.DefaultFormatter + format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + access: + # "()": uvicorn.logging.AccessFormatter + format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s' +handlers: + default: + formatter: default + class: logging.StreamHandler + stream: ext://sys.stderr + access: + formatter: access + class: logging.StreamHandler + stream: ext://sys.stdout +loggers: + uvicorn.error: + level: INFO + handlers: + - default + propagate: no + uvicorn.access: + level: INFO + handlers: + - access + propagate: no +root: + level: DEBUG + handlers: + - default + propagate: no \ No newline at end of file diff --git a/containers/record-linkage/app/main.py b/containers/record-linkage/app/main.py index 4b58b1e57c..a8a4f972e5 100644 --- a/containers/record-linkage/app/main.py +++ b/containers/record-linkage/app/main.py @@ -10,66 +10,14 @@ DIBBS_ENHANCED, ) from pydantic import BaseModel, Field -from psycopg2 import OperationalError, errors from typing import Optional from app.utils import ( connect_to_mpi_with_env_vars, - load_mpi_env_vars_os, read_json_from_assets, + run_migrations, ) -import psycopg2 -import sys - -# https://kb.objectrocket.com/postgresql/python-error-handling-with-the-psycopg2-postgresql-adapter-645 -def print_psycopg2_exception(err): - # get details about the exception - err_type, _, traceback = sys.exc_info() - - # get the line number when exception occured - line_num = traceback.tb_lineno - - # print the connect() error - print("\npsycopg2 ERROR:", err, "on line number:", line_num) - print("psycopg2 traceback:", traceback, "-- type:", err_type) - - # psycopg2 extensions.Diagnostics object attribute - print("\nextensions.Diagnostics:", err.diag) - - # print the pgcode and pgerror exceptions - print("pgerror:", err.pgerror) - print("pgcode:", err.pgcode, "\n") - - -def run_migrations(): - dbname, user, password, host = load_mpi_env_vars_os() - try: - connection = psycopg2.connect( - dbname=dbname, - user=user, - password=password, - host=host, - ) - except OperationalError as err: - # pass exception to function - print_psycopg2_exception(err) - - # set the connection to 'None' in case of error - connection = None - if connection is not None: - try: - with connection.cursor() as cursor: - cursor.execute( - open( - Path(__file__).parent.parent / "migrations" / "tables.ddl", "r" - ).read() - ) - except errors.InFailedSqlTransaction as err: - # pass exception to function - print_psycopg2_exception(err) - - -# Run MPI migrations on spin up +# Ensure MPI is configured as expected. run_migrations() # Instantiate FastAPI via PHDI's BaseService class @@ -80,7 +28,7 @@ def run_migrations(): ).start() -# Request and and response models +# Request and response models class LinkRecordInput(BaseModel): """ Schema for requests to the /link-record endpoint. diff --git a/containers/record-linkage/app/utils.py b/containers/record-linkage/app/utils.py index 792424b773..363f6e8580 100644 --- a/containers/record-linkage/app/utils.py +++ b/containers/record-linkage/app/utils.py @@ -2,6 +2,9 @@ import pathlib from app.config import get_settings from phdi.linkage import DIBBsConnectorClient +import subprocess +from typing import Literal +import logging def connect_to_mpi_with_env_vars(): @@ -35,3 +38,90 @@ def load_mpi_env_vars_os(): def read_json_from_assets(filename: str): return json.load(open((pathlib.Path(__file__).parent.parent / "assets" / filename))) + + +def run_pyway( + pyway_command: Literal["info", "validate", "migrate", "import"] +) -> subprocess.CompletedProcess: + """ + Helper function to run the pyway CLI from Python. + + :param pyway_command: The specific pyway command to run. + :return: A subprocess.CompletedProcess object containing the results of the pyway + command. + """ + + logger = logging.getLogger(__name__) + + # Prepare the pyway command. + migrations_dir = str(pathlib.Path(__file__).parent.parent / "migrations") + settings = get_settings() + pyway_args = [ + f"--database-migration-dir {migrations_dir}", + f"--database-type {settings['mpi_db_type']}", + f"--database-host {settings['mpi_host']}", + f"--database-port {settings['mpi_port']}", + f"--database-name {settings['mpi_dbname']}", + f"--database-username {settings['mpi_user']}", + f"--database-password {settings['mpi_password']}", + ] + + full_command = ["pyway", pyway_command] + pyway_args + full_command = " ".join(full_command) + + # Attempt to run the pyway command. + try: + pyway_response = subprocess.run( + full_command, shell=True, check=True, capture_output=True + ) + except subprocess.CalledProcessError as error: + error_message = error.output.decode("utf-8") + + # Pyway validate returns an error if no migrations have been applied yet. + # This is expected behavior, so we can ignore this error and continue onto + # the migrations with pyway migrate. We'll encounter this error when we + # first deploy the service with a fresh database. + if ( + "ERROR: no migrations applied yet, no validation necessary." + in error_message + ): + logger.warning(error_message) + return subprocess.CompletedProcess( + args=full_command, + returncode=0, + stdout=None, + stderr=error_message, + ) + else: + logger.error(error_message) + raise error + + logger.info(pyway_response.stdout.decode("utf-8")) + + return pyway_response + + +def run_migrations(): + """ + Use the pyway CLI to ensure that the MPI database is up to date with the latest + migrations. + """ + logger = logging.getLogger(__name__) + logger.info("Validating MPI database schema...") + validation_response = run_pyway("validate") + + if validation_response.returncode == 0: + logger.info("MPI database schema validations successful.") + + logger.info("Migrating MPI database...") + migrations_response = run_pyway("migrate") + + if migrations_response.returncode == 0: + logger.info("MPI database migrations successful.") + else: + logger.error("MPI database migrations failed.") + raise Exception(migrations_response.stderr.decode("utf-8")) + + else: + logger.error("MPI database schema validations failed.") + raise Exception(validation_response.stderr.decode("utf-8")) diff --git a/containers/record-linkage/description.md b/containers/record-linkage/description.md index 05dc52eac1..c826a6571b 100644 --- a/containers/record-linkage/description.md +++ b/containers/record-linkage/description.md @@ -43,7 +43,7 @@ We recommend running the record linkage service from a container, but if that is 4. Make a fresh virtual environment with `python -m venv .venv`. 5. Activate the virtual environment with `source .venv/bin/activate` (MacOS and Linux), `venv\Scripts\activate` (Windows Command Prompt), or `.venv\Scripts\Activate.ps1` (Windows PowerShell). 5. Install all Python dependencies for the record linkage service with `pip install -r requirements.txt` into your virtual environment. -6. Run the FHIR Converter on `localhost:8080` with `python -m uvicorn app.main:app --host 0.0.0.0 --port 8080`. +6. Run the record linkage service on `localhost:8080` with `python -m uvicorn app.main:app --host 0.0.0.0 --port 8080 --log-config app/log_config.yml`. ### Building the Docker Image diff --git a/containers/record-linkage/migrations/tables.ddl b/containers/record-linkage/migrations/V01_01__initial_schema.sql similarity index 100% rename from containers/record-linkage/migrations/tables.ddl rename to containers/record-linkage/migrations/V01_01__initial_schema.sql diff --git a/containers/record-linkage/requirements.txt b/containers/record-linkage/requirements.txt index 5d80345b34..0a08ef9d14 100644 --- a/containers/record-linkage/requirements.txt +++ b/containers/record-linkage/requirements.txt @@ -5,3 +5,6 @@ fastapi==0.96.0 phdi @ git+https://github.com/CDCgov/phdi@main httpx pathlib +pyway +psycopg2-binary==2.9.7 +tabulate \ No newline at end of file diff --git a/containers/record-linkage/tests/test_record_linkage.py b/containers/record-linkage/tests/test_record_linkage.py index e6b31eea13..d0446f2f6a 100644 --- a/containers/record-linkage/tests/test_record_linkage.py +++ b/containers/record-linkage/tests/test_record_linkage.py @@ -56,6 +56,8 @@ def clean_up_db(): dbconn.commit() cursor.execute("DROP TABLE IF EXISTS person") dbconn.commit() + cursor.execute("DROP TABLE IF EXISTS pyway") + dbconn.commit() cursor.close() dbconn.close() @@ -113,9 +115,9 @@ def test_linkage_invalid_db_type(): def test_linkage_success(): # Clear MPI ahead of testing - clean_up_db() - run_migrations() - test_bundle = load_test_bundle() + # clean_up_db() + # run_migrations() + # test_bundle = load_test_bundle() set_mpi_env_vars() clean_up_db() diff --git a/containers/record-linkage/tests/test_utils.py b/containers/record-linkage/tests/test_utils.py new file mode 100644 index 0000000000..ad71829407 --- /dev/null +++ b/containers/record-linkage/tests/test_utils.py @@ -0,0 +1,158 @@ +from app.utils import run_pyway, run_migrations +from unittest import mock +import pathlib +from typing import Literal +import pytest +import subprocess + + +MOCK_SETTINGS = { + "mpi_db_type": "postgres", + "mpi_host": "localhost", + "mpi_port": "5432", + "mpi_dbname": "testdb", + "mpi_user": "postgres", + "mpi_password": "pw", +} + + +def make_pyway_command( + pyway_command: Literal["info", "validate", "migrate", "import"] +) -> str: + """ + Helper function for tests that require a pyway command. + :param pyway_command: The specific pyway command to run. + :return: A string containing the pyway command. + """ + + migrations_dir = str(pathlib.Path(__file__).parent.parent / "migrations") + + pyway_command = " ".join( + [ + "pyway", + pyway_command, + f"--database-migration-dir {migrations_dir}", + f"--database-type {MOCK_SETTINGS['mpi_db_type']}", + f"--database-host {MOCK_SETTINGS['mpi_host']}", + f"--database-port {MOCK_SETTINGS['mpi_port']}", + f"--database-name {MOCK_SETTINGS['mpi_dbname']}", + f"--database-username {MOCK_SETTINGS['mpi_user']}", + f"--database-password {MOCK_SETTINGS['mpi_password']}", + ] + ) + return pyway_command + + +@mock.patch("app.utils.get_settings") +@mock.patch("app.utils.subprocess.run") +def test_run_pyway_success(patched_subprocess, patched_get_settings): + """ + Test the happy path in run_pyway() + """ + global MOCK_SETTINGS + patched_get_settings.return_value = MOCK_SETTINGS + run_pyway("info") + pyway_command = make_pyway_command("info") + patched_subprocess.assert_called_once_with( + pyway_command, + shell=True, + check=True, + capture_output=True, + ) + + +@mock.patch("app.utils.get_settings") +@mock.patch("app.utils.subprocess.run") +def test_run_pyway_failure(patched_subprocess, patched_get_settings): + """ + The general failure mode of run_pyway() when a subprocess.CalledProcessError is + raised. + """ + + global MOCK_SETTINGS + patched_get_settings.return_value = MOCK_SETTINGS + output = mock.Mock() + output.decode.return_value = "test" + patched_subprocess.side_effect = subprocess.CalledProcessError( + returncode=1, cmd="test", stderr="test", output=output + ) + pyway_command = make_pyway_command("info") + with pytest.raises(subprocess.CalledProcessError): + run_pyway("info") + patched_subprocess.assert_called_once_with( + pyway_command, + shell=True, + check=True, + capture_output=True, + ) + + +@mock.patch("app.utils.get_settings") +@mock.patch("app.utils.subprocess.run") +def test_run_pyway_no_migrations(patched_subprocess, patched_get_settings): + """ + Test the special case where 'pyway validate' returns an error if no migrations have + been applied yet. + """ + + global MOCK_SETTINGS + patched_get_settings.return_value = MOCK_SETTINGS + output = mock.Mock() + output.decode.return_value = ( + "ERROR: no migrations applied yet, no validation necessary." + ) + patched_subprocess.side_effect = subprocess.CalledProcessError( + returncode=1, cmd="test", stderr="test", output=output + ) + pyway_command = make_pyway_command("validate") + run_pyway("validate") + patched_subprocess.assert_called_once_with( + pyway_command, + shell=True, + check=True, + capture_output=True, + ) + + +@mock.patch("app.utils.run_pyway") +def test_run_migrations_success(patched_run_pyway): + """ + Test the happy path in run_migrations() + """ + validation_response = mock.Mock() + validation_response.returncode = 0 + migration_response = mock.Mock() + migration_response.returncode = 0 + patched_run_pyway.side_effect = [validation_response, migration_response] + run_migrations() + patched_run_pyway.assert_has_calls([mock.call("validate"), mock.call("migrate")]) + + +@mock.patch("app.utils.run_pyway") +def test_run_migrations_validation_failure(patched_run_pyway): + """ + Test the case where the validation step fails in run_migrations(). + """ + validation_response = mock.Mock() + validation_response.returncode = 1 + migration_response = mock.Mock() + migration_response.returncode = 0 + patched_run_pyway.side_effect = [validation_response, migration_response] + with pytest.raises(Exception): + run_migrations() + patched_run_pyway.assert_called_once_with("validate") + + +@mock.patch("app.utils.run_pyway") +def test_run_migrations_migration_failure(patched_run_pyway): + """ + Test the case where the migration step fails in run_migrations(). + """ + validation_response = mock.Mock() + validation_response.returncode = 0 + migration_response = mock.Mock() + migration_response.returncode = 1 + patched_run_pyway.side_effect = [validation_response, migration_response] + with pytest.raises(Exception): + run_migrations() + patched_run_pyway.assert_has_calls([mock.call("validate"), mock.call("migrate")])