Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove dag.run() method #42761

Merged
merged 1 commit into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions airflow/executors/executor_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ def set_default_executor(cls, executor: BaseExecutor) -> None:

This is used in rare cases such as dag.run which allows, as a user convenience, to provide
the executor by cli/argument instead of Airflow configuration

todo: given comments above, is this needed anymore since DAG.run is removed?
"""
exec_class_name = executor.__class__.__qualname__
exec_name = ExecutorName(f"{executor.__module__}.{exec_class_name}")
Expand Down
83 changes: 2 additions & 81 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import sys
import time
import traceback
import warnings
import weakref
from collections import abc, defaultdict, deque
from contextlib import ExitStack
Expand Down Expand Up @@ -88,13 +87,11 @@
DuplicateTaskIdFound,
FailStopDagInvalidTriggerRule,
ParamValidationError,
RemovedInAirflow3Warning,
TaskDeferred,
TaskNotFound,
UnknownExecutorException,
)
from airflow.executors.executor_loader import ExecutorLoader
from airflow.jobs.job import run_job
from airflow.models.abstractoperator import AbstractOperator, TaskStateChangeCallback
from airflow.models.asset import (
AssetDagRunQueue,
Expand Down Expand Up @@ -2296,84 +2293,8 @@ def _remove_task(self, task_id: str) -> None:

self.task_count = len(self.task_dict)

def run(
self,
start_date=None,
end_date=None,
mark_success=False,
local=False,
donot_pickle=airflow_conf.getboolean("core", "donot_pickle"),
ignore_task_deps=False,
ignore_first_depends_on_past=True,
pool=None,
delay_on_limit_secs=1.0,
verbose=False,
conf=None,
rerun_failed_tasks=False,
run_backwards=False,
run_at_least_once=False,
continue_on_failures=False,
disable_retry=False,
):
"""
Run the DAG.

:param start_date: the start date of the range to run
:param end_date: the end date of the range to run
:param mark_success: True to mark jobs as succeeded without running them
:param local: True to run the tasks using the LocalExecutor
:param donot_pickle: True to avoid pickling DAG object and send to workers
:param ignore_task_deps: True to skip upstream tasks
:param ignore_first_depends_on_past: True to ignore depends_on_past
dependencies for the first set of tasks only
:param pool: Resource pool to use
:param delay_on_limit_secs: Time in seconds to wait before next attempt to run
dag run when max_active_runs limit has been reached
:param verbose: Make logging output more verbose
:param conf: user defined dictionary passed from CLI
:param rerun_failed_tasks:
:param run_backwards:
:param run_at_least_once: If true, always run the DAG at least once even
if no logical run exists within the time range.
"""
warnings.warn(
"`DAG.run()` is deprecated and will be removed in Airflow 3.0. Consider "
"using `DAG.test()` instead, or trigger your dag via API.",
RemovedInAirflow3Warning,
stacklevel=2,
)

from airflow.executors.executor_loader import ExecutorLoader
from airflow.jobs.backfill_job_runner import BackfillJobRunner

if local:
from airflow.executors.local_executor import LocalExecutor

ExecutorLoader.set_default_executor(LocalExecutor())

from airflow.jobs.job import Job

job = Job()
job_runner = BackfillJobRunner(
job=job,
dag=self,
start_date=start_date,
end_date=end_date,
mark_success=mark_success,
donot_pickle=donot_pickle,
ignore_task_deps=ignore_task_deps,
ignore_first_depends_on_past=ignore_first_depends_on_past,
pool=pool,
delay_on_limit_secs=delay_on_limit_secs,
verbose=verbose,
conf=conf,
rerun_failed_tasks=rerun_failed_tasks,
run_backwards=run_backwards,
run_at_least_once=run_at_least_once,
continue_on_failures=continue_on_failures,
disable_retry=disable_retry,
)
run_job(job=job, execute_callable=job_runner._execute)
def run(self, *args, **kwargs):
dstandish marked this conversation as resolved.
Show resolved Hide resolved
"""Leaving this here to be removed in other PR for simpler review."""

def cli(self):
"""Exposes a CLI specific to this DAG."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,35 +78,14 @@ It requires "airflow-env" virtual environment configured locally.

- Copy any example DAG present in the ``/airflow/example_dags`` directory to ``/files/dags/``.

- Add a ``__main__`` block at the end of your DAG file to make it runnable. It will run a ``back_fill`` job:
- Add a ``__main__`` block at the end of your DAG file to make it runnable:

.. code-block:: python

if __name__ == "__main__":
dag.clear()
dag.run()
dag.test()

- Add ``AIRFLOW__CORE__EXECUTOR=DebugExecutor`` to Environment variable of Run Configuration.

- Click on Add configuration

.. raw:: html

<div align="center" style="padding-bottom:10px">
<img src="images/pycharm_add_configuration.png"
alt="Add Configuration pycharm">
</div>

- Add Script Path and Environment Variable to new Python configuration

.. raw:: html

<div align="center" style="padding-bottom:10px">
<img src="images/pycharm_add_env_variable.png"
alt="Add environment variable pycharm">
</div>

- Now Debug an example dag and view the entries in tables such as ``dag_run, xcom`` etc in MySQL Workbench.
- Run the file.

Creating a branch
#################
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ Setting up debugging


if __name__ == "__main__":
dag.clear()
dag.run()
dag.test()

- Add ``"AIRFLOW__CORE__EXECUTOR": "DebugExecutor"`` to the ``"env"`` field of Debug configuration.

Expand Down
Binary file not shown.
Binary file not shown.
15 changes: 3 additions & 12 deletions contributing-docs/testing/dag_testing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,22 @@ DAG Testing
===========

To ease and speed up the process of developing DAGs, you can use
py:class:`~airflow.executors.debug_executor.DebugExecutor`, which is a single process executor
for debugging purposes. Using this executor, you can run and debug DAGs from your IDE.
py:meth:`~airflow.models.dag.DAG.test`, which will run a dag in a single process.

To set up the IDE:

1. Add ``main`` block at the end of your DAG file to make it runnable.
It will run a backfill job:

.. code-block:: python

if __name__ == "__main__":
dag.clear()
dag.run()
dag.test()


2. Set up ``AIRFLOW__CORE__EXECUTOR=DebugExecutor`` in the run configuration of your IDE.
Make sure to also set up all environment variables required by your DAG.

3. Run and debug the DAG file.

Additionally, ``DebugExecutor`` can be used in a fail-fast mode that will make
all other running or scheduled tasks fail immediately. To enable this option, set
``AIRFLOW__DEBUG__FAIL_FAST=True`` or adjust ``fail_fast`` option in your ``airflow.cfg``.

Also, with the Airflow CLI command ``airflow dags test``, you can execute one complete run of a DAG:
You can also run the dag in the same manner with the Airflow CLI command ``airflow dags test``:

.. code-block:: bash

Expand Down
7 changes: 5 additions & 2 deletions dev/tests_common/test_utils/system_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
logger = logging.getLogger(__name__)


def get_test_run(dag):
def get_test_run(dag, **test_kwargs):
def callback(context: Context):
ti = context["dag_run"].get_task_instances()
if not ti:
Expand Down Expand Up @@ -60,7 +60,10 @@ def test_run():
dag.on_success_callback = add_callback(dag.on_success_callback, callback)
# If the env variable ``_AIRFLOW__SYSTEM_TEST_USE_EXECUTOR`` is set, then use an executor to run the
# DAG
dag_run = dag.test(use_executor=os.environ.get("_AIRFLOW__SYSTEM_TEST_USE_EXECUTOR") == "1")
dag_run = dag.test(
use_executor=os.environ.get("_AIRFLOW__SYSTEM_TEST_USE_EXECUTOR") == "1",
**test_kwargs,
)
assert (
dag_run.state == DagRunState.SUCCESS
), "The system test failed, please look at the logs to find out the underlying failed task(s)"
Expand Down
26 changes: 0 additions & 26 deletions dev/tests_common/test_utils/system_tests_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@

from airflow.configuration import AIRFLOW_HOME, AirflowConfigParser, get_airflow_config
from airflow.exceptions import AirflowException
from airflow.models.dagbag import DagBag

from dev.tests_common.test_utils import AIRFLOW_MAIN_FOLDER
from dev.tests_common.test_utils.logging_command_executor import get_executor
Expand Down Expand Up @@ -131,31 +130,6 @@ def _print_all_log_files():
with open(filepath) as f:
print(f.read())

def run_dag(self, dag_id: str, dag_folder: str = DEFAULT_DAG_FOLDER) -> None:
"""
Runs example dag by its ID.

:param dag_id: id of a DAG to be run
:param dag_folder: directory where to look for the specific DAG. Relative to AIRFLOW_HOME.
"""
self.log.info("Looking for DAG: %s in %s", dag_id, dag_folder)
dag_bag = DagBag(dag_folder=dag_folder, include_examples=False)
dag = dag_bag.get_dag(dag_id)
if dag is None:
raise AirflowException(
f"The Dag {dag_id} could not be found. It's either an import problem, wrong dag_id or DAG is "
"not in provided dag_folder.The content of "
f"the {dag_folder} folder is {os.listdir(dag_folder)}"
)

self.log.info("Attempting to run DAG: %s", dag_id)
dag.clear()
try:
dag.run(ignore_first_depends_on_past=True, verbose=True)
except Exception:
self._print_all_log_files()
raise

@staticmethod
def create_dummy_file(filename, dir_path="/tmp"):
os.makedirs(dir_path, exist_ok=True)
Expand Down
13 changes: 2 additions & 11 deletions docs/apache-airflow/core-concepts/debug.rst
Original file line number Diff line number Diff line change
Expand Up @@ -122,18 +122,9 @@ For more information on setting the configuration, see :doc:`../../howto/set-con

1. Add ``main`` block at the end of your DAG file to make it runnable.

It will run a backfill job:

.. code-block:: python

if __name__ == "__main__":
from airflow.utils.state import State

dag.clear()
dag.run()


2. Setup ``AIRFLOW__CORE__EXECUTOR=DebugExecutor`` in run configuration of your IDE. In
this step you should also setup all environment variables required by your DAG.
dag.test()

3. Run / debug the DAG file.
2. Run / debug the DAG file.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
"example_display_video_misc",
start_date=START_DATE,
catchup=False,
) as dag2:
) as dag_example_display_video_misc:
# [START howto_google_display_video_upload_multiple_entity_read_files_to_big_query]
upload_erf_to_bq = GCSToBigQueryOperator(
task_id="upload_erf_to_bq",
Expand Down Expand Up @@ -125,7 +125,7 @@
"example_display_video_sdf",
start_date=START_DATE,
catchup=False,
) as dag3:
) as dag_example_display_video_sdf:
dstandish marked this conversation as resolved.
Show resolved Hide resolved
# [START howto_google_display_video_create_sdf_download_task_operator]
create_sdf_download_task = GoogleDisplayVideo360CreateSDFDownloadTaskOperator(
task_id="create_sdf_download_task", body_request=CREATE_SDF_DOWNLOAD_TASK_BODY_REQUEST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
from airflow.utils.session import create_session

from dev.tests_common.test_utils.db import clear_db_connections
from dev.tests_common.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest
from dev.tests_common.test_utils.gcp_system_helpers import GoogleSystemTest
from dev.tests_common.test_utils.system_tests import get_test_run

TOKEN = os.environ.get("DATAPREP_TOKEN")
EXTRA = {"token": TOKEN}
Expand All @@ -52,4 +53,7 @@ def teardown_method(self):
clear_db_connections()

def test_run_example_dag(self):
self.run_dag(dag_id="example_dataprep", dag_folder=CLOUD_DAG_FOLDER)
from providers.tests.system.google.cloud.dataprep.example_dataprep import dag

run = get_test_run(dag)
run()
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ def teardown_method(self):

@provide_gcp_context(GCP_DATASTORE_KEY)
def test_run_example_dag(self):
self.run_dag("example_gcp_datastore", CLOUD_DAG_FOLDER)
self.run_dag("example_gcp_datastore", CLOUD_DAG_FOLDER) # this dag does not exist?

@provide_gcp_context(GCP_DATASTORE_KEY)
def test_run_example_dag_operations(self):
self.run_dag("example_gcp_datastore_operations", CLOUD_DAG_FOLDER)
self.run_dag("example_gcp_datastore_operations", CLOUD_DAG_FOLDER) # this dag does not exist?
dstandish marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@

from airflow.exceptions import AirflowException
from airflow.models import Connection
from airflow.providers.google.cloud.example_dags import example_facebook_ads_to_gcs
from airflow.utils.process_utils import patch_environ

from dev.tests_common.test_utils.gcp_system_helpers import (
CLOUD_DAG_FOLDER,
GoogleSystemTest,
provide_gcp_context,
)
from dev.tests_common.test_utils.system_tests import get_test_run
from providers.tests.google.cloud.utils.gcp_authenticator import GCP_BIGQUERY_KEY

CREDENTIALS_DIR = os.environ.get("CREDENTIALS_DIR", "/files/airflow-breeze-config/keys")
Expand Down Expand Up @@ -71,4 +72,5 @@ class TestFacebookAdsToGcsExampleDagsSystem(GoogleSystemTest):
@provide_gcp_context(GCP_BIGQUERY_KEY)
@provide_facebook_connection(FACEBOOK_CREDENTIALS_PATH)
def test_dag_example(self):
self.run_dag("example_facebook_ads_to_gcs", CLOUD_DAG_FOLDER)
run = get_test_run(example_facebook_ads_to_gcs.dag)
run()
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@

import pytest

from airflow.providers.google.cloud.example_dags import example_salesforce_to_gcs

from dev.tests_common.test_utils.gcp_system_helpers import (
CLOUD_DAG_FOLDER,
GoogleSystemTest,
provide_gcp_context,
)
from dev.tests_common.test_utils.salesforce_system_helpers import provide_salesforce_connection
from dev.tests_common.test_utils.system_tests import get_test_run
from providers.tests.google.cloud.utils.gcp_authenticator import GCP_BIGQUERY_KEY

CREDENTIALS_DIR = os.environ.get("CREDENTIALS_DIR", "/files/airflow-breeze-config/keys")
Expand All @@ -42,4 +44,5 @@ class TestSalesforceIntoGCSExample(GoogleSystemTest):
@provide_gcp_context(GCP_BIGQUERY_KEY)
@provide_salesforce_connection(SALESFORCE_CREDENTIALS_PATH)
def test_run_example_dag_salesforce_to_gcs_operator(self):
self.run_dag("example_salesforce_to_gcs", CLOUD_DAG_FOLDER)
run = get_test_run(example_salesforce_to_gcs.dag)
run()
Loading