Skip to content

Commit

Permalink
Support both virtualenv use-cases in two task groups in the example DAG
Browse files Browse the repository at this point in the history
  • Loading branch information
LennartKloppenburg committed Oct 23, 2023
1 parent 89913c2 commit 02dbd9a
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 23 deletions.
76 changes: 54 additions & 22 deletions dev/dags/example_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
from datetime import datetime
from pathlib import Path

from cosmos import DbtDag, ExecutionMode, ExecutionConfig, ProjectConfig, ProfileConfig
from airflow.decorators import dag
from airflow.configuration import get_airflow_home
from airflow.operators.empty import EmptyOperator

from cosmos import DbtTaskGroup, ExecutionMode, ExecutionConfig, ProjectConfig, ProfileConfig
from cosmos.profiles import PostgresUserPasswordProfileMapping

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
Expand All @@ -21,29 +25,57 @@
),
)

# [START virtualenv_example]
example_virtualenv = DbtDag(
# dbt/cosmos-specific parameters
project_config=ProjectConfig(
DBT_ROOT_PATH / "jaffle_shop",
),
profile_config=profile_config,
execution_config=ExecutionConfig(
execution_mode=ExecutionMode.VIRTUALENV,
# We can enable this flag if we want Airflow to create one virtualenv
# and reuse that within the whole DAG.
# virtualenv_dir=f"{get_airflow_home()}/persistent-venv",
),
operator_args={
"py_system_site_packages": False,
"py_requirements": ["dbt-postgres==1.6.0b1"],
"install_deps": True,
},
# normal dag parameters
@dag(
schedule_interval="@daily",
start_date=datetime(2023, 1, 1),
catchup=False,
dag_id="example_virtualenv",
default_args={"retries": 2},
)
def example_virtualenv() -> None:
start_task = EmptyOperator(task_id='start-venv-examples')
end_task = EmptyOperator(task_id='end-venv-examples')

tmp_venv_task_group = DbtTaskGroup(
group_id='tmp-venv-group',
# dbt/cosmos-specific parameters
project_config=ProjectConfig(
DBT_ROOT_PATH / "jaffle_shop",
),
profile_config=profile_config,
execution_config=ExecutionConfig(
execution_mode=ExecutionMode.VIRTUALENV,
# We can enable this flag if we want Airflow to create one virtualenv
# and reuse that within the whole DAG.
# virtualenv_dir=f"{get_airflow_home()}/persistent-venv",
),
operator_args={
"py_system_site_packages": False,
"py_requirements": ["dbt-postgres==1.6.0b1"],
"install_deps": True,
},
)

cached_venv_task_group = DbtTaskGroup(
group_id='cached-venv-group',
# dbt/cosmos-specific parameters
project_config=ProjectConfig(
DBT_ROOT_PATH / "jaffle_shop",
),
profile_config=profile_config,
execution_config=ExecutionConfig(
execution_mode=ExecutionMode.VIRTUALENV,
# We can enable this flag if we want Airflow to create one virtualenv
# and reuse that within the whole DAG.
virtualenv_dir=Path(f"{get_airflow_home()}/persistent-venv"),
),
operator_args={
"py_system_site_packages": False,
"py_requirements": ["dbt-postgres==1.6.0b1"],
"install_deps": True,
},
)

start_task >> [tmp_venv_task_group, cached_venv_task_group] >> end_task

example_virtualenv()
# [END virtualenv_example]

3 changes: 2 additions & 1 deletion tests/test_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ def test_converter_creates_dag_with_seed(mock_load_dbt_graph, execution_mode, op
@patch("cosmos.converter.DbtGraph.load")
def test_converter_raises_warning(mock_load_dbt_graph, execution_mode, virtualenv_dir, operator_args, caplog):
"""
This test will raise a warning if we are trying to pass ExecutionMode != `VirtualEnv` andm still pass a defined `virtualenv_dir`
This test will raise a warning if we are trying to pass ExecutionMode != `VirtualEnv`
and still pass a defined `virtualenv_dir`
"""
project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT)
execution_config = ExecutionConfig(execution_mode=execution_mode, virtualenv_dir=virtualenv_dir)
Expand Down

0 comments on commit 02dbd9a

Please sign in to comment.