Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add task owner to dbt operator #1082

Merged
merged 14 commits into from
Jul 23, 2024
4 changes: 4 additions & 0 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def create_test_task_metadata(
task_args["on_warning_callback"] = on_warning_callback
extra_context = {}

task_owner = ""
if test_indirect_selection != TestIndirectSelection.EAGER:
task_args["indirect_selection"] = test_indirect_selection.value
if node is not None:
Expand All @@ -106,6 +107,7 @@ def create_test_task_metadata(
task_args["select"] = node.resource_name

extra_context = {"dbt_node_config": node.context_dict}
task_owner = node.owner

elif render_config is not None: # TestBehavior.AFTER_ALL
task_args["select"] = render_config.select
Expand All @@ -114,6 +116,7 @@ def create_test_task_metadata(

return TaskMetadata(
id=test_task_name,
owner=task_owner,
operator_class=calculate_operator_class(
execution_mode=execution_mode,
dbt_class="DbtTest",
Expand Down Expand Up @@ -158,6 +161,7 @@ def create_task_metadata(

task_metadata = TaskMetadata(
id=task_id,
owner=node.owner,
operator_class=calculate_operator_class(
execution_mode=execution_mode, dbt_class=dbt_resource_to_class[node.resource_type]
),
Expand Down
6 changes: 6 additions & 0 deletions cosmos/core/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,16 @@ def get_airflow_task(task: Task, dag: DAG, task_group: "TaskGroup | None" = None
module = importlib.import_module(module_name)
Operator = getattr(module, class_name)

if task.owner != "":
task_owner = task.owner
else:
task_owner = dag.owner

airflow_task = Operator(
task_id=task.id,
dag=dag,
task_group=task_group,
owner=task_owner,
extra_context=task.extra_context,
**task.arguments,
)
Expand Down
1 change: 1 addition & 0 deletions cosmos/core/graph/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class Task(CosmosEntity):
:param arguments: The arguments to pass to the operator
"""

owner: str = ""
operator_class: str = "airflow.operators.empty.EmptyOperator"
arguments: Dict[str, Any] = field(default_factory=dict)
extra_context: Dict[str, Any] = field(default_factory=dict)
4 changes: 4 additions & 0 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ def name(self) -> str:
"""
return self.resource_name.replace(".", "_")

@property
def owner(self) -> str:
return str(self.config.get("meta", {}).get("owner", ""))

@property
def context_dict(self) -> dict[str, Any]:
"""
Expand Down
8 changes: 7 additions & 1 deletion tests/airflow/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
depends_on=[parent_seed.unique_id],
file_path=SAMPLE_PROJ_PATH / "gen2/models/parent.sql",
tags=["has_child"],
config={"materialized": "view"},
config={"materialized": "view", "meta": {"owner": "parent_node"}},
tatiana marked this conversation as resolved.
Show resolved Hide resolved
has_test=True,
)
test_parent_node = DbtNode(
Expand Down Expand Up @@ -123,6 +123,12 @@ def test_build_airflow_graph_with_after_each():
assert dag.leaves[0].task_id == "child_run"
assert dag.leaves[1].task_id == "child2_v2_run"

task_seed_parent_seed = dag.tasks[0]
task_parent_run = dag.tasks[1]

assert task_seed_parent_seed.owner == ""
assert task_parent_run.owner == "parent_node"


@pytest.mark.parametrize(
"node_type,task_suffix",
Expand Down
Loading