Skip to content

Commit

Permalink
Address review comments from @tatiana
Browse files Browse the repository at this point in the history
  • Loading branch information
pankajkoti committed Sep 30, 2024
1 parent 7ea9ee8 commit 3565723
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 12 deletions.
8 changes: 4 additions & 4 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from cosmos.config import RenderConfig
from cosmos.constants import (
DBT_COMPILE_TASK_ID,
DEFAULT_DBT_RESOURCES,
TESTABLE_DBT_RESOURCES,
DbtResourceType,
Expand All @@ -20,7 +21,6 @@
from cosmos.core.graph.entities import Task as TaskMetadata
from cosmos.dbt.graph import DbtNode
from cosmos.log import get_logger
from cosmos.settings import dbt_compile_task_id

logger = get_logger(__name__)

Expand Down Expand Up @@ -264,17 +264,17 @@ def _add_dbt_compile_task(
return

compile_task_metadata = TaskMetadata(
id=dbt_compile_task_id,
id=DBT_COMPILE_TASK_ID,
operator_class="cosmos.operators.airflow_async.DbtCompileAirflowAsyncOperator",
arguments=task_args,
extra_context={},
)
compile_airflow_task = create_airflow_task(compile_task_metadata, dag, task_group=None)
tasks_map[dbt_compile_task_id] = compile_airflow_task
tasks_map[DBT_COMPILE_TASK_ID] = compile_airflow_task

for node_id, node in nodes.items():
if not node.depends_on and node_id in tasks_map:
tasks_map[dbt_compile_task_id] >> tasks_map[node_id]
tasks_map[DBT_COMPILE_TASK_ID] >> tasks_map[node_id]


def build_airflow_graph(
Expand Down
2 changes: 2 additions & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,5 @@ def _missing_value_(cls, value): # type: ignore
# It expects that you have already created those resources through the appropriate commands.
# https://docs.getdbt.com/reference/commands/test
TESTABLE_DBT_RESOURCES = {DbtResourceType.MODEL, DbtResourceType.SOURCE, DbtResourceType.SNAPSHOT, DbtResourceType.SEED}

DBT_COMPILE_TASK_ID = "dbt_compile"
3 changes: 2 additions & 1 deletion cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from functools import cached_property
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Literal, Sequence
from urllib.parse import urlparse

import jinja2
from airflow import DAG
Expand Down Expand Up @@ -268,7 +269,7 @@ def _configure_remote_target_path() -> tuple[Path, str] | tuple[None, None]:

remote_conn_id = remote_target_path_conn_id
if not remote_conn_id:
target_path_schema = target_path_str.split("://")[0]
target_path_schema = urlparse(target_path_str).scheme
remote_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(target_path_schema, None) # type: ignore[assignment]
if remote_conn_id is None:
return None, None
Expand Down
1 change: 0 additions & 1 deletion cosmos/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
remote_cache_dir = conf.get("cosmos", "remote_cache_dir", fallback=None)
remote_cache_dir_conn_id = conf.get("cosmos", "remote_cache_dir_conn_id", fallback=None)

dbt_compile_task_id = conf.get("cosmos", "dbt_compile_task_id", fallback="dbt_compile")
remote_target_path = conf.get("cosmos", "remote_target_path", fallback=None)
remote_target_path_conn_id = conf.get("cosmos", "remote_target_path_conn_id", fallback=None)

Expand Down
9 changes: 6 additions & 3 deletions docs/getting_started/execution-modes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Cosmos can run ``dbt`` commands using five different approaches, called ``execut
5. **aws_eks**: Run ``dbt`` commands from AWS EKS Pods managed by Cosmos (requires a pre-existing Docker image)
6. **azure_container_instance**: Run ``dbt`` commands from Azure Container Instances managed by Cosmos (requires a pre-existing Docker image)
7. **gcp_cloud_run_job**: Run ``dbt`` commands from GCP Cloud Run Job instances managed by Cosmos (requires a pre-existing Docker image)
8. **airflow_async**: (Introduced since Cosmos 1.7.0) Run the dbt resources from your dbt project asynchronously, by submitting the corresponding compiled SQLs to Apache Airflow's `Deferrable operators <https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html>`__
8. **airflow_async**: (Experimental and introduced since Cosmos 1.7.0) Run the dbt resources from your dbt project asynchronously, by submitting the corresponding compiled SQLs to Apache Airflow's `Deferrable operators <https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html>`__

The choice of the ``execution mode`` can vary based on each user's needs and concerns. For more details, check each execution mode described below.

Expand Down Expand Up @@ -244,7 +244,7 @@ Each task will create a new Cloud Run Job execution, giving full isolation. The
)
Airflow Async (experimental)
-------------
----------------------------

.. versionadded:: 1.7.0

Expand All @@ -260,14 +260,17 @@ the ``dbt compile`` command on your dbt project which then outputs compiled SQLs
As part of the same task run, these compiled SQLs are then stored remotely to a remote path set using the
:ref:`remote_target_path` configuration. The remote path is then used by the subsequent tasks in the DAG to
fetch (from the remote path) and run the compiled SQLs asynchronously using e.g. the ``DbtRunAirflowAsyncOperator``.
You may observe that the compile task takes a bit longer to run due to the latency of storing the compiled SQLs remotely,
You may observe that the compile task takes a bit longer to run due to the latency of storing the compiled SQLs
remotely (e.g. for the classic ``jaffle_shop`` dbt project, upon compiling it produces about 31 files measuring about 124KB in total, but on a local
machine it took approximately 25 seconds for the task to compile & upload the compiled SQLs to the remote path).,
however, it is still a win as it is one-time overhead and the subsequent tasks run asynchronously utilising the Airflow's
deferrable operators and supplying to them those compiled SQLs.

Note that currently, the ``airflow_async`` execution mode has the following limitations and is released as Experimental:

1. Only supports the ``dbt resource type`` models to be run asynchronously using Airflow deferrable operators. All other resources are executed synchronously using dbt commands as they are in the ``local`` execution mode.
2. Only supports BigQuery as the target database. If a profile target other than BigQuery is specified, Cosmos will error out saying that the target database is not supported with this execution mode.
3. Only works for ``full_refresh`` models. There is pending work to support other modes.

Example DAG:

Expand Down
6 changes: 3 additions & 3 deletions tests/airflow/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
)
from cosmos.config import ProfileConfig, RenderConfig
from cosmos.constants import (
DBT_COMPILE_TASK_ID,
DbtResourceType,
ExecutionMode,
SourceRenderingBehavior,
Expand All @@ -30,7 +31,6 @@
from cosmos.converter import airflow_kwargs
from cosmos.dbt.graph import DbtNode
from cosmos.profiles import PostgresUserPasswordProfileMapping
from cosmos.settings import dbt_compile_task_id

SAMPLE_PROJ_PATH = Path("/home/user/path/dbt-proj/")
SOURCE_RENDERING_BEHAVIOR = SourceRenderingBehavior(os.getenv("SOURCE_RENDERING_BEHAVIOR", "none"))
Expand Down Expand Up @@ -258,8 +258,8 @@ def test_build_airflow_graph_with_dbt_compile_task():
)

task_ids = [task.task_id for task in dag.tasks]
assert dbt_compile_task_id in task_ids
assert dbt_compile_task_id in dag.tasks[0].upstream_task_ids
assert DBT_COMPILE_TASK_ID in task_ids
assert DBT_COMPILE_TASK_ID in dag.tasks[0].upstream_task_ids


def test_calculate_operator_class():
Expand Down

0 comments on commit 3565723

Please sign in to comment.