-
Notifications
You must be signed in to change notification settings - Fork 159
/
example_virtualenv.py
88 lines (76 loc) · 3 KB
/
example_virtualenv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
"""
An example DAG that uses Cosmos to render a dbt project as an Airflow DAG.
"""
import os
from datetime import datetime
from pathlib import Path
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
from cosmos import DbtTaskGroup, ExecutionConfig, ExecutionMode, ProfileConfig, ProjectConfig
from cosmos.profiles import PostgresUserPasswordProfileMapping
DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
PROJECT_NAME = "jaffle_shop"
profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="example_conn",
profile_args={"schema": "public"},
),
)
# [START virtualenv_example]
@dag(
schedule_interval="@daily",
start_date=datetime(2023, 1, 1),
catchup=False,
)
def example_virtualenv() -> None:
start_task = EmptyOperator(task_id="start-venv-examples")
end_task = EmptyOperator(task_id="end-venv-examples")
# This first task group creates a new Cosmos virtualenv every time a task is run
# and deletes it afterwards
# It is much slower than if the user sets the `virtualenv_dir`
tmp_venv_task_group = DbtTaskGroup(
group_id="tmp-venv-group",
# dbt/cosmos-specific parameters
project_config=ProjectConfig(
DBT_ROOT_PATH / "jaffle_shop",
),
profile_config=profile_config,
execution_config=ExecutionConfig(
execution_mode=ExecutionMode.VIRTUALENV,
# Without setting virtualenv_dir="/some/path/persistent-venv",
# Cosmos creates a new Python virtualenv for each dbt task being executed
),
operator_args={
"py_system_site_packages": False,
"py_requirements": ["dbt-postgres"],
"install_deps": True,
"emit_datasets": False, # Example of how to not set inlets and outlets
},
)
# The following task group reuses the Cosmos-managed Python virtualenv across multiple tasks.
# It runs approximately 70% faster than the previous TaskGroup.
cached_venv_task_group = DbtTaskGroup(
group_id="cached-venv-group",
# dbt/cosmos-specific parameters
project_config=ProjectConfig(
DBT_ROOT_PATH / "jaffle_shop",
),
profile_config=profile_config,
execution_config=ExecutionConfig(
execution_mode=ExecutionMode.VIRTUALENV,
# We can set the argument `virtualenv_dir` if we want Cosmos to create one Python virtualenv
# and reuse that to run all the dbt tasks within the same worker node
virtualenv_dir=Path("/tmp/persistent-venv2"),
),
operator_args={
"py_system_site_packages": False,
"py_requirements": ["dbt-postgres"],
"install_deps": True,
},
)
start_task >> [tmp_venv_task_group, cached_venv_task_group] >> end_task
example_virtualenv()
# [END virtualenv_example]