diff --git a/airflow/executors/executor_loader.py b/airflow/executors/executor_loader.py index 1eeee1ff68a9..ec79860918b8 100644 --- a/airflow/executors/executor_loader.py +++ b/airflow/executors/executor_loader.py @@ -173,6 +173,8 @@ def set_default_executor(cls, executor: BaseExecutor) -> None: This is used in rare cases such as dag.run which allows, as a user convenience, to provide the executor by cli/argument instead of Airflow configuration + + todo: given comments above, is this needed anymore since DAG.run is removed? """ exec_class_name = executor.__class__.__qualname__ exec_name = ExecutorName(f"{executor.__module__}.{exec_class_name}") diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 2dc425daa054..5cc5cf443140 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -28,7 +28,6 @@ import sys import time import traceback -import warnings import weakref from collections import abc, defaultdict, deque from contextlib import ExitStack @@ -88,13 +87,11 @@ DuplicateTaskIdFound, FailStopDagInvalidTriggerRule, ParamValidationError, - RemovedInAirflow3Warning, TaskDeferred, TaskNotFound, UnknownExecutorException, ) from airflow.executors.executor_loader import ExecutorLoader -from airflow.jobs.job import run_job from airflow.models.abstractoperator import AbstractOperator, TaskStateChangeCallback from airflow.models.asset import ( AssetDagRunQueue, @@ -2296,84 +2293,8 @@ def _remove_task(self, task_id: str) -> None: self.task_count = len(self.task_dict) - def run( - self, - start_date=None, - end_date=None, - mark_success=False, - local=False, - donot_pickle=airflow_conf.getboolean("core", "donot_pickle"), - ignore_task_deps=False, - ignore_first_depends_on_past=True, - pool=None, - delay_on_limit_secs=1.0, - verbose=False, - conf=None, - rerun_failed_tasks=False, - run_backwards=False, - run_at_least_once=False, - continue_on_failures=False, - disable_retry=False, - ): - """ - Run the DAG. - - :param start_date: the start date of the range to run - :param end_date: the end date of the range to run - :param mark_success: True to mark jobs as succeeded without running them - :param local: True to run the tasks using the LocalExecutor - :param donot_pickle: True to avoid pickling DAG object and send to workers - :param ignore_task_deps: True to skip upstream tasks - :param ignore_first_depends_on_past: True to ignore depends_on_past - dependencies for the first set of tasks only - :param pool: Resource pool to use - :param delay_on_limit_secs: Time in seconds to wait before next attempt to run - dag run when max_active_runs limit has been reached - :param verbose: Make logging output more verbose - :param conf: user defined dictionary passed from CLI - :param rerun_failed_tasks: - :param run_backwards: - :param run_at_least_once: If true, always run the DAG at least once even - if no logical run exists within the time range. - """ - warnings.warn( - "`DAG.run()` is deprecated and will be removed in Airflow 3.0. Consider " - "using `DAG.test()` instead, or trigger your dag via API.", - RemovedInAirflow3Warning, - stacklevel=2, - ) - - from airflow.executors.executor_loader import ExecutorLoader - from airflow.jobs.backfill_job_runner import BackfillJobRunner - - if local: - from airflow.executors.local_executor import LocalExecutor - - ExecutorLoader.set_default_executor(LocalExecutor()) - - from airflow.jobs.job import Job - - job = Job() - job_runner = BackfillJobRunner( - job=job, - dag=self, - start_date=start_date, - end_date=end_date, - mark_success=mark_success, - donot_pickle=donot_pickle, - ignore_task_deps=ignore_task_deps, - ignore_first_depends_on_past=ignore_first_depends_on_past, - pool=pool, - delay_on_limit_secs=delay_on_limit_secs, - verbose=verbose, - conf=conf, - rerun_failed_tasks=rerun_failed_tasks, - run_backwards=run_backwards, - run_at_least_once=run_at_least_once, - continue_on_failures=continue_on_failures, - disable_retry=disable_retry, - ) - run_job(job=job, execute_callable=job_runner._execute) + def run(self, *args, **kwargs): + """Leaving this here to be removed in other PR for simpler review.""" def cli(self): """Exposes a CLI specific to this DAG.""" diff --git a/contributing-docs/quick-start-ide/contributors_quick_start_pycharm.rst b/contributing-docs/quick-start-ide/contributors_quick_start_pycharm.rst index d830496b2720..4a3319ae97dd 100644 --- a/contributing-docs/quick-start-ide/contributors_quick_start_pycharm.rst +++ b/contributing-docs/quick-start-ide/contributors_quick_start_pycharm.rst @@ -78,35 +78,14 @@ It requires "airflow-env" virtual environment configured locally. - Copy any example DAG present in the ``/airflow/example_dags`` directory to ``/files/dags/``. -- Add a ``__main__`` block at the end of your DAG file to make it runnable. It will run a ``back_fill`` job: +- Add a ``__main__`` block at the end of your DAG file to make it runnable: .. code-block:: python if __name__ == "__main__": - dag.clear() - dag.run() + dag.test() -- Add ``AIRFLOW__CORE__EXECUTOR=DebugExecutor`` to Environment variable of Run Configuration. - - - Click on Add configuration - - .. raw:: html - -
- Add Configuration pycharm -
- - - Add Script Path and Environment Variable to new Python configuration - - .. raw:: html - -
- Add environment variable pycharm -
- -- Now Debug an example dag and view the entries in tables such as ``dag_run, xcom`` etc in MySQL Workbench. +- Run the file. Creating a branch ################# diff --git a/contributing-docs/quick-start-ide/contributors_quick_start_vscode.rst b/contributing-docs/quick-start-ide/contributors_quick_start_vscode.rst index 88ff1fdd84e5..61fdf501063d 100644 --- a/contributing-docs/quick-start-ide/contributors_quick_start_vscode.rst +++ b/contributing-docs/quick-start-ide/contributors_quick_start_vscode.rst @@ -72,8 +72,7 @@ Setting up debugging if __name__ == "__main__": - dag.clear() - dag.run() + dag.test() - Add ``"AIRFLOW__CORE__EXECUTOR": "DebugExecutor"`` to the ``"env"`` field of Debug configuration. diff --git a/contributing-docs/quick-start-ide/images/pycharm_add_configuration.png b/contributing-docs/quick-start-ide/images/pycharm_add_configuration.png deleted file mode 100644 index 525b73e6141a..000000000000 Binary files a/contributing-docs/quick-start-ide/images/pycharm_add_configuration.png and /dev/null differ diff --git a/contributing-docs/quick-start-ide/images/pycharm_add_env_variable.png b/contributing-docs/quick-start-ide/images/pycharm_add_env_variable.png deleted file mode 100644 index f40837221140..000000000000 Binary files a/contributing-docs/quick-start-ide/images/pycharm_add_env_variable.png and /dev/null differ diff --git a/contributing-docs/testing/dag_testing.rst b/contributing-docs/testing/dag_testing.rst index 7e311171ce01..0bf506c2f321 100644 --- a/contributing-docs/testing/dag_testing.rst +++ b/contributing-docs/testing/dag_testing.rst @@ -20,31 +20,22 @@ DAG Testing =========== To ease and speed up the process of developing DAGs, you can use -py:class:`~airflow.executors.debug_executor.DebugExecutor`, which is a single process executor -for debugging purposes. Using this executor, you can run and debug DAGs from your IDE. +py:meth:`~airflow.models.dag.DAG.test`, which will run a dag in a single process. To set up the IDE: 1. Add ``main`` block at the end of your DAG file to make it runnable. -It will run a backfill job: .. code-block:: python if __name__ == "__main__": - dag.clear() - dag.run() + dag.test() -2. Set up ``AIRFLOW__CORE__EXECUTOR=DebugExecutor`` in the run configuration of your IDE. - Make sure to also set up all environment variables required by your DAG. - 3. Run and debug the DAG file. -Additionally, ``DebugExecutor`` can be used in a fail-fast mode that will make -all other running or scheduled tasks fail immediately. To enable this option, set -``AIRFLOW__DEBUG__FAIL_FAST=True`` or adjust ``fail_fast`` option in your ``airflow.cfg``. -Also, with the Airflow CLI command ``airflow dags test``, you can execute one complete run of a DAG: +You can also run the dag in the same manner with the Airflow CLI command ``airflow dags test``: .. code-block:: bash diff --git a/dev/tests_common/test_utils/system_tests.py b/dev/tests_common/test_utils/system_tests.py index 578ee6cc04d4..6558ae2d1e4c 100644 --- a/dev/tests_common/test_utils/system_tests.py +++ b/dev/tests_common/test_utils/system_tests.py @@ -30,7 +30,7 @@ logger = logging.getLogger(__name__) -def get_test_run(dag): +def get_test_run(dag, **test_kwargs): def callback(context: Context): ti = context["dag_run"].get_task_instances() if not ti: @@ -60,7 +60,10 @@ def test_run(): dag.on_success_callback = add_callback(dag.on_success_callback, callback) # If the env variable ``_AIRFLOW__SYSTEM_TEST_USE_EXECUTOR`` is set, then use an executor to run the # DAG - dag_run = dag.test(use_executor=os.environ.get("_AIRFLOW__SYSTEM_TEST_USE_EXECUTOR") == "1") + dag_run = dag.test( + use_executor=os.environ.get("_AIRFLOW__SYSTEM_TEST_USE_EXECUTOR") == "1", + **test_kwargs, + ) assert ( dag_run.state == DagRunState.SUCCESS ), "The system test failed, please look at the logs to find out the underlying failed task(s)" diff --git a/dev/tests_common/test_utils/system_tests_class.py b/dev/tests_common/test_utils/system_tests_class.py index 836782b8584c..5abdca96bee0 100644 --- a/dev/tests_common/test_utils/system_tests_class.py +++ b/dev/tests_common/test_utils/system_tests_class.py @@ -28,7 +28,6 @@ from airflow.configuration import AIRFLOW_HOME, AirflowConfigParser, get_airflow_config from airflow.exceptions import AirflowException -from airflow.models.dagbag import DagBag from dev.tests_common.test_utils import AIRFLOW_MAIN_FOLDER from dev.tests_common.test_utils.logging_command_executor import get_executor @@ -131,31 +130,6 @@ def _print_all_log_files(): with open(filepath) as f: print(f.read()) - def run_dag(self, dag_id: str, dag_folder: str = DEFAULT_DAG_FOLDER) -> None: - """ - Runs example dag by its ID. - - :param dag_id: id of a DAG to be run - :param dag_folder: directory where to look for the specific DAG. Relative to AIRFLOW_HOME. - """ - self.log.info("Looking for DAG: %s in %s", dag_id, dag_folder) - dag_bag = DagBag(dag_folder=dag_folder, include_examples=False) - dag = dag_bag.get_dag(dag_id) - if dag is None: - raise AirflowException( - f"The Dag {dag_id} could not be found. It's either an import problem, wrong dag_id or DAG is " - "not in provided dag_folder.The content of " - f"the {dag_folder} folder is {os.listdir(dag_folder)}" - ) - - self.log.info("Attempting to run DAG: %s", dag_id) - dag.clear() - try: - dag.run(ignore_first_depends_on_past=True, verbose=True) - except Exception: - self._print_all_log_files() - raise - @staticmethod def create_dummy_file(filename, dir_path="/tmp"): os.makedirs(dir_path, exist_ok=True) diff --git a/docs/apache-airflow/core-concepts/debug.rst b/docs/apache-airflow/core-concepts/debug.rst index 9ab7819b8b30..d58c49085452 100644 --- a/docs/apache-airflow/core-concepts/debug.rst +++ b/docs/apache-airflow/core-concepts/debug.rst @@ -122,18 +122,9 @@ For more information on setting the configuration, see :doc:`../../howto/set-con 1. Add ``main`` block at the end of your DAG file to make it runnable. -It will run a backfill job: - .. code-block:: python if __name__ == "__main__": - from airflow.utils.state import State - - dag.clear() - dag.run() - - -2. Setup ``AIRFLOW__CORE__EXECUTOR=DebugExecutor`` in run configuration of your IDE. In - this step you should also setup all environment variables required by your DAG. + dag.test() -3. Run / debug the DAG file. +2. Run / debug the DAG file. diff --git a/providers/src/airflow/providers/google/marketing_platform/example_dags/example_display_video.py b/providers/src/airflow/providers/google/marketing_platform/example_dags/example_display_video.py index 33abc67b639d..3c008ee5ca26 100644 --- a/providers/src/airflow/providers/google/marketing_platform/example_dags/example_display_video.py +++ b/providers/src/airflow/providers/google/marketing_platform/example_dags/example_display_video.py @@ -92,7 +92,7 @@ "example_display_video_misc", start_date=START_DATE, catchup=False, -) as dag2: +) as dag_example_display_video_misc: # [START howto_google_display_video_upload_multiple_entity_read_files_to_big_query] upload_erf_to_bq = GCSToBigQueryOperator( task_id="upload_erf_to_bq", @@ -125,7 +125,7 @@ "example_display_video_sdf", start_date=START_DATE, catchup=False, -) as dag3: +) as dag_example_display_video_sdf: # [START howto_google_display_video_create_sdf_download_task_operator] create_sdf_download_task = GoogleDisplayVideo360CreateSDFDownloadTaskOperator( task_id="create_sdf_download_task", body_request=CREATE_SDF_DOWNLOAD_TASK_BODY_REQUEST diff --git a/providers/tests/google/cloud/operators/test_dataprep_system.py b/providers/tests/google/cloud/operators/test_dataprep_system.py index 96f47fa3e365..dad77ac4ff80 100644 --- a/providers/tests/google/cloud/operators/test_dataprep_system.py +++ b/providers/tests/google/cloud/operators/test_dataprep_system.py @@ -26,7 +26,8 @@ from airflow.utils.session import create_session from dev.tests_common.test_utils.db import clear_db_connections -from dev.tests_common.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest +from dev.tests_common.test_utils.gcp_system_helpers import GoogleSystemTest +from dev.tests_common.test_utils.system_tests import get_test_run TOKEN = os.environ.get("DATAPREP_TOKEN") EXTRA = {"token": TOKEN} @@ -52,4 +53,7 @@ def teardown_method(self): clear_db_connections() def test_run_example_dag(self): - self.run_dag(dag_id="example_dataprep", dag_folder=CLOUD_DAG_FOLDER) + from providers.tests.system.google.cloud.dataprep.example_dataprep import dag + + run = get_test_run(dag) + run() diff --git a/providers/tests/google/cloud/operators/test_datastore_system.py b/providers/tests/google/cloud/operators/test_datastore_system.py index a98215a5317a..8807288358ba 100644 --- a/providers/tests/google/cloud/operators/test_datastore_system.py +++ b/providers/tests/google/cloud/operators/test_datastore_system.py @@ -44,8 +44,8 @@ def teardown_method(self): @provide_gcp_context(GCP_DATASTORE_KEY) def test_run_example_dag(self): - self.run_dag("example_gcp_datastore", CLOUD_DAG_FOLDER) + self.run_dag("example_gcp_datastore", CLOUD_DAG_FOLDER) # this dag does not exist? @provide_gcp_context(GCP_DATASTORE_KEY) def test_run_example_dag_operations(self): - self.run_dag("example_gcp_datastore_operations", CLOUD_DAG_FOLDER) + self.run_dag("example_gcp_datastore_operations", CLOUD_DAG_FOLDER) # this dag does not exist? diff --git a/providers/tests/google/cloud/transfers/test_facebook_ads_to_gcs_system.py b/providers/tests/google/cloud/transfers/test_facebook_ads_to_gcs_system.py index ba24a0c34da2..9cb0a9be5264 100644 --- a/providers/tests/google/cloud/transfers/test_facebook_ads_to_gcs_system.py +++ b/providers/tests/google/cloud/transfers/test_facebook_ads_to_gcs_system.py @@ -25,13 +25,14 @@ from airflow.exceptions import AirflowException from airflow.models import Connection +from airflow.providers.google.cloud.example_dags import example_facebook_ads_to_gcs from airflow.utils.process_utils import patch_environ from dev.tests_common.test_utils.gcp_system_helpers import ( - CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context, ) +from dev.tests_common.test_utils.system_tests import get_test_run from providers.tests.google.cloud.utils.gcp_authenticator import GCP_BIGQUERY_KEY CREDENTIALS_DIR = os.environ.get("CREDENTIALS_DIR", "/files/airflow-breeze-config/keys") @@ -71,4 +72,5 @@ class TestFacebookAdsToGcsExampleDagsSystem(GoogleSystemTest): @provide_gcp_context(GCP_BIGQUERY_KEY) @provide_facebook_connection(FACEBOOK_CREDENTIALS_PATH) def test_dag_example(self): - self.run_dag("example_facebook_ads_to_gcs", CLOUD_DAG_FOLDER) + run = get_test_run(example_facebook_ads_to_gcs.dag) + run() diff --git a/providers/tests/google/cloud/transfers/test_salesforce_to_gcs_system.py b/providers/tests/google/cloud/transfers/test_salesforce_to_gcs_system.py index afd0856fad24..d556f2d86e2f 100644 --- a/providers/tests/google/cloud/transfers/test_salesforce_to_gcs_system.py +++ b/providers/tests/google/cloud/transfers/test_salesforce_to_gcs_system.py @@ -20,12 +20,14 @@ import pytest +from airflow.providers.google.cloud.example_dags import example_salesforce_to_gcs + from dev.tests_common.test_utils.gcp_system_helpers import ( - CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context, ) from dev.tests_common.test_utils.salesforce_system_helpers import provide_salesforce_connection +from dev.tests_common.test_utils.system_tests import get_test_run from providers.tests.google.cloud.utils.gcp_authenticator import GCP_BIGQUERY_KEY CREDENTIALS_DIR = os.environ.get("CREDENTIALS_DIR", "/files/airflow-breeze-config/keys") @@ -42,4 +44,5 @@ class TestSalesforceIntoGCSExample(GoogleSystemTest): @provide_gcp_context(GCP_BIGQUERY_KEY) @provide_salesforce_connection(SALESFORCE_CREDENTIALS_PATH) def test_run_example_dag_salesforce_to_gcs_operator(self): - self.run_dag("example_salesforce_to_gcs", CLOUD_DAG_FOLDER) + run = get_test_run(example_salesforce_to_gcs.dag) + run() diff --git a/providers/tests/google/marketing_platform/operators/test_display_video_system.py b/providers/tests/google/marketing_platform/operators/test_display_video_system.py index 78f5d4ee021f..49f44948abfd 100644 --- a/providers/tests/google/marketing_platform/operators/test_display_video_system.py +++ b/providers/tests/google/marketing_platform/operators/test_display_video_system.py @@ -19,13 +19,18 @@ import pytest from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook -from airflow.providers.google.marketing_platform.example_dags.example_display_video import BUCKET +from airflow.providers.google.marketing_platform.example_dags.example_display_video import ( + BUCKET, + dag_example_display_video_misc, + dag_example_display_video_sdf, +) from dev.tests_common.test_utils.gcp_system_helpers import ( MARKETING_DAG_FOLDER, GoogleSystemTest, provide_gcp_context, ) +from dev.tests_common.test_utils.system_tests import get_test_run from providers.tests.google.cloud.utils.gcp_authenticator import GCP_BIGQUERY_KEY, GMP_KEY # Requires the following scope: @@ -50,12 +55,14 @@ def teardown_method(self): @provide_gcp_context(GMP_KEY, scopes=SCOPES) def test_run_example_dag(self): - self.run_dag("example_display_video", MARKETING_DAG_FOLDER) + self.run_dag("example_display_video", MARKETING_DAG_FOLDER) # this dag does not exist? @provide_gcp_context(GMP_KEY, scopes=SCOPES) def test_run_example_dag_misc(self): - self.run_dag("example_display_video_misc", MARKETING_DAG_FOLDER) + run = get_test_run(dag_example_display_video_misc) + run() @provide_gcp_context(GMP_KEY, scopes=SCOPES) def test_run_example_dag_sdf(self): - self.run_dag("example_display_video_sdf", MARKETING_DAG_FOLDER) + run = get_test_run(dag_example_display_video_sdf) + run() diff --git a/providers/tests/system/google/cloud/dataprep/example_dataprep.py b/providers/tests/system/google/cloud/dataprep/example_dataprep.py index 9f603f43fb1b..cdc736a41c66 100644 --- a/providers/tests/system/google/cloud/dataprep/example_dataprep.py +++ b/providers/tests/system/google/cloud/dataprep/example_dataprep.py @@ -313,6 +313,7 @@ def delete_connection(connection_id: str) -> None: # when "tearDown" task with trigger rule is part of the DAG list(dag.tasks) >> watcher() + from dev.tests_common.test_utils.system_tests import get_test_run # noqa: E402 # Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py index 00364794f1d5..f0c9a18c1a56 100644 --- a/tests/cli/commands/test_dag_command.py +++ b/tests/cli/commands/test_dag_command.py @@ -322,7 +322,7 @@ def test_cli_backfill_ignore_first_depends_on_past(self, mock_run): We just check we call dag.run() right. The behaviour of that kwarg is tested in test_jobs """ - dag_id = "test_dagrun_states_deadlock" + dag_id = "example_bash_operator" run_date = DEFAULT_DATE + timedelta(days=1) args = [ "dags", diff --git a/tests/core/test_example_dags_system.py b/tests/core/test_example_dags_system.py index c60b7325b125..bd34d9bb1591 100644 --- a/tests/core/test_example_dags_system.py +++ b/tests/core/test_example_dags_system.py @@ -17,16 +17,128 @@ # under the License. from __future__ import annotations +from datetime import timedelta + +import pendulum import pytest +from sqlalchemy import select + +from airflow.models import DagRun +from airflow.operators.python import PythonOperator +from airflow.utils.module_loading import import_string +from airflow.utils.state import DagRunState +from airflow.utils.trigger_rule import TriggerRule +from dev.tests_common.test_utils.system_tests import get_test_run from dev.tests_common.test_utils.system_tests_class import SystemTest +def fail(): + raise ValueError + + +def get_dag_success(dag_maker): + with dag_maker( + dag_id="test_dagrun_states_success", + schedule=timedelta(days=1), + ) as dag: + dag4_task1 = PythonOperator( + task_id="test_dagrun_fail", + python_callable=fail, + ) + dag4_task2 = PythonOperator( + task_id="test_dagrun_succeed", trigger_rule=TriggerRule.ALL_FAILED, python_callable=print + ) + dag4_task2.set_upstream(dag4_task1) + return dag + + +def get_dag_fail(dag_maker): + with dag_maker( + dag_id="test_dagrun_states_fail", + schedule=timedelta(days=1), + ) as dag: + dag3_task1 = PythonOperator(task_id="to_fail", python_callable=fail) + dag3_task2 = PythonOperator(task_id="to_succeed", python_callable=print) + dag3_task2.set_upstream(dag3_task1) + return dag + + +def get_dag_fail_root(dag_maker): + with dag_maker( + dag_id="test_dagrun_states_root_fail", + schedule=timedelta(days=1), + ) as dag: + PythonOperator(task_id="test_dagrun_succeed", python_callable=print) + PythonOperator( + task_id="test_dagrun_fail", + python_callable=fail, + ) + return dag + + @pytest.mark.system("core") class TestExampleDagsSystem(SystemTest): @pytest.mark.parametrize( - "dag_id", + "module", ["example_bash_operator", "example_branch_operator", "tutorial_dag", "example_dag_decorator"], ) - def test_dag_example(self, dag_id): - self.run_dag(dag_id=dag_id) + def test_dag_example(self, module): + test_run = import_string(f"airflow.example_dags.{module}.test_run") + test_run() + + @pytest.mark.parametrize( + "factory, expected", + [ + (get_dag_fail, "failed"), + (get_dag_fail_root, "failed"), + (get_dag_success, "success"), + ], + ) + def test_dag_run_final_state(self, factory, expected, dag_maker, session): + """ + These tests are migrated tests that were added in PR #1289 + which was fixing issue #1225. + + I would be very surprised if these things were not covered elsewhere already + but, just in case, I'm migrating them to system tests. + """ + dag = factory(dag_maker) + run = get_test_run(dag) + with pytest.raises(AssertionError, match="The system test failed"): + run() + dr = session.scalar(select(DagRun)) + assert dr.state == "failed" + + def test_dag_root_task_start_date_future(self, dag_maker, session): + """ + These tests are migrated tests that were added in PR #1289 + which was fixing issue #1225. + + This one tests what happens when there's a dag with a root task with future start date. + + The dag should run, but no TI should be created for the task where start date in future. + """ + exec_date = pendulum.datetime(2021, 1, 1) + fut_start_date = pendulum.datetime(2021, 2, 1) + with dag_maker( + dag_id="dagrun_states_root_future", + schedule=timedelta(days=1), + catchup=False, + ) as dag: + PythonOperator( + task_id="current", + python_callable=lambda: print("hello"), + ) + PythonOperator( + task_id="future", + python_callable=lambda: print("hello"), + start_date=fut_start_date, + ) + run = get_test_run(dag, execution_date=exec_date) + run() + dr = session.scalar(select(DagRun)) + tis = dr.task_instances + assert dr.state == DagRunState.SUCCESS + assert len(tis) == 1 + assert tis[0].task_id == "current" diff --git a/tests/dags/test_future_start_date.py b/tests/dags/test_future_start_date.py new file mode 100644 index 000000000000..dadfbff600f6 --- /dev/null +++ b/tests/dags/test_future_start_date.py @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from datetime import timedelta + +import pendulum + +from airflow.models.dag import DAG +from airflow.operators.empty import EmptyOperator +from airflow.operators.python import PythonOperator + +exec_date = pendulum.datetime(2021, 1, 1) +fut_start_date = pendulum.datetime(2021, 2, 1) +with DAG( + dag_id="test_dagrun_states_root_future", + schedule=timedelta(days=1), + catchup=True, + start_date=exec_date, +) as dag: + EmptyOperator(task_id="current") + PythonOperator( + task_id="future", + python_callable=lambda: print("hello"), + start_date=fut_start_date, + ) diff --git a/tests/dags/test_issue_1225.py b/tests/dags/test_issue_1225.py deleted file mode 100644 index 96a3ad156269..000000000000 --- a/tests/dags/test_issue_1225.py +++ /dev/null @@ -1,149 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -""" -DAG designed to test what happens when a DAG with pooled tasks is run -by a BackfillJob. -Addresses issue #1225. -""" - -from __future__ import annotations - -from datetime import datetime, timedelta - -from airflow.models.dag import DAG -from airflow.operators.empty import EmptyOperator -from airflow.operators.python import PythonOperator -from airflow.utils.trigger_rule import TriggerRule - -DEFAULT_DATE = datetime(2016, 1, 1) -default_args = dict(start_date=DEFAULT_DATE, owner="airflow") - - -def fail(): - raise ValueError("Expected failure.") - - -# DAG tests backfill with pooled tasks -# Previously backfill would queue the task but never run it -dag1 = DAG( - dag_id="test_backfill_pooled_task_dag", - schedule=timedelta(days=1), - default_args=default_args, -) -dag1_task1 = EmptyOperator( - task_id="test_backfill_pooled_task", - dag=dag1, - pool="test_backfill_pooled_task_pool", -) - -# dag2 has been moved to test_prev_dagrun_dep.py - -# DAG tests that a Dag run that doesn't complete is marked failed -dag3 = DAG( - dag_id="test_dagrun_states_fail", - schedule=timedelta(days=1), - default_args=default_args, -) -dag3_task1 = PythonOperator(task_id="test_dagrun_fail", dag=dag3, python_callable=fail) -dag3_task2 = EmptyOperator( - task_id="test_dagrun_succeed", - dag=dag3, -) -dag3_task2.set_upstream(dag3_task1) - -# DAG tests that a Dag run that completes but has a failure is marked success -dag4 = DAG( - dag_id="test_dagrun_states_success", - schedule=timedelta(days=1), - default_args=default_args, -) -dag4_task1 = PythonOperator( - task_id="test_dagrun_fail", - dag=dag4, - python_callable=fail, -) -dag4_task2 = EmptyOperator(task_id="test_dagrun_succeed", dag=dag4, trigger_rule=TriggerRule.ALL_FAILED) -dag4_task2.set_upstream(dag4_task1) - -# DAG tests that a Dag run that completes but has a root failure is marked fail -dag5 = DAG( - dag_id="test_dagrun_states_root_fail", - schedule=timedelta(days=1), - default_args=default_args, -) -dag5_task1 = EmptyOperator( - task_id="test_dagrun_succeed", - dag=dag5, -) -dag5_task2 = PythonOperator( - task_id="test_dagrun_fail", - dag=dag5, - python_callable=fail, -) - -# DAG tests that a Dag run that is deadlocked with no states is failed -dag6 = DAG( - dag_id="test_dagrun_states_deadlock", - schedule=timedelta(days=1), - default_args=default_args, -) -dag6_task1 = EmptyOperator( - task_id="test_depends_on_past", - depends_on_past=True, - dag=dag6, -) -dag6_task2 = EmptyOperator( - task_id="test_depends_on_past_2", - depends_on_past=True, - dag=dag6, -) -dag6_task2.set_upstream(dag6_task1) - - -# DAG tests that a Dag run that doesn't complete but has a root failure is marked running -dag8 = DAG( - dag_id="test_dagrun_states_root_fail_unfinished", - schedule=timedelta(days=1), - default_args=default_args, -) -dag8_task1 = EmptyOperator( - task_id="test_dagrun_unfinished", # The test will unset the task instance state after - # running this test - dag=dag8, -) -dag8_task2 = PythonOperator( - task_id="test_dagrun_fail", - dag=dag8, - python_callable=fail, -) - -# DAG tests that a Dag run that completes but has a root in the future is marked as success -dag9 = DAG( - dag_id="test_dagrun_states_root_future", - schedule=timedelta(days=1), - default_args=default_args, -) -dag9_task1 = EmptyOperator( - task_id="current", - dag=dag9, -) -dag9_task2 = EmptyOperator( - task_id="future", - dag=dag9, - start_date=DEFAULT_DATE + timedelta(days=1), -) diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py index f1bb6f17d05c..dead9be86230 100644 --- a/tests/jobs/test_backfill_job.py +++ b/tests/jobs/test_backfill_job.py @@ -32,7 +32,6 @@ from airflow.cli import cli_parser from airflow.exceptions import ( AirflowException, - AirflowTaskTimeout, BackfillUnfinished, DagConcurrencyLimitReached, NoAvailablePoolSlot, @@ -54,7 +53,6 @@ from airflow.utils import timezone from airflow.utils.session import create_session from airflow.utils.state import DagRunState, State, TaskInstanceState -from airflow.utils.timeout import timeout from airflow.utils.trigger_rule import TriggerRule from airflow.utils.types import DagRunType from tests.listeners import dag_listener @@ -1070,34 +1068,6 @@ def test_backfill_ordered_concurrent_execute(self, dag_maker, mock_executor): ], ] - def test_backfill_pooled_tasks(self): - """ - Test that queued tasks are executed by BackfillJobRunner - """ - session = settings.Session() - pool = Pool(pool="test_backfill_pooled_task_pool", slots=1, include_deferred=False) - session.add(pool) - session.commit() - session.close() - - dag = self.dagbag.get_dag("test_backfill_pooled_task_dag") - dag.clear() - - job = Job() - job_runner = BackfillJobRunner(job=job, dag=dag, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) - - # run with timeout because this creates an infinite loop if not - # caught - try: - with timeout(seconds=20): - run_job(job=job, execute_callable=job_runner._execute) - except AirflowTaskTimeout: - logger.info("Timeout while waiting for task to complete") - run_id = f"backfill__{DEFAULT_DATE.isoformat()}" - ti = TI(task=dag.get_task("test_backfill_pooled_task"), run_id=run_id) - ti.refresh_from_db() - assert ti.state == State.SUCCESS - @pytest.mark.parametrize("ignore_depends_on_past", [True, False]) def test_backfill_depends_on_past_works_independently_on_ignore_depends_on_past( self, ignore_depends_on_past, mock_executor diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index a5748ebaeef9..639a3528fad8 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -43,7 +43,8 @@ from airflow.callbacks.database_callback_sink import DatabaseCallbackSink from airflow.callbacks.pipe_callback_sink import PipeCallbackSink from airflow.dag_processing.manager import DagFileProcessorAgent -from airflow.exceptions import AirflowException, RemovedInAirflow3Warning +from airflow.decorators import task +from airflow.exceptions import AirflowException from airflow.executors.base_executor import BaseExecutor from airflow.executors.executor_constants import MOCK_EXECUTOR from airflow.executors.executor_loader import ExecutorLoader @@ -2828,128 +2829,33 @@ def test_do_not_schedule_removed_task(self, dag_maker): assert [] == res - @provide_session - def evaluate_dagrun( - self, - dag_id, - expected_task_states, # dict of task_id: state - dagrun_state, - run_kwargs=None, - advance_execution_date=False, - session=None, - ): - """ - Helper for testing DagRun states with simple two-task DAGs. - This is hackish: a dag run is created but its tasks are - run by a backfill. - """ - - # todo: AIP-78 remove along with DAG.run() - # this only tests the backfill job runner, not the scheduler - - if run_kwargs is None: - run_kwargs = {} - - dag = self.dagbag.get_dag(dag_id) - dagrun_info = dag.next_dagrun_info(None) - assert dagrun_info is not None - data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} - dr = dag.create_dagrun( - run_type=DagRunType.SCHEDULED, - execution_date=dagrun_info.logical_date, - state=None, - session=session, - data_interval=data_interval, - **triggered_by_kwargs, - ) - - if advance_execution_date: - # run a second time to schedule a dagrun after the start_date - dr = dag.create_dagrun( - run_type=DagRunType.SCHEDULED, - execution_date=dr.data_interval_end, - state=None, - session=session, - data_interval=data_interval, - **triggered_by_kwargs, - ) - ex_date = dr.execution_date - - for tid, state in expected_task_states.items(): - if state == State.FAILED: - self.null_exec.mock_task_fail(dag_id, tid, dr.run_id) - - try: - dag = DagBag().get_dag(dag.dag_id) - # This needs a _REAL_ dag, not the serialized version - assert not isinstance(dag, SerializedDAG) - # TODO: Can this be replaced with `self.run_scheduler_until_dagrun_terminal. `dag.run` isn't - # great to use here as it uses BackfillJobRunner! - for _ in _mock_executor(self.null_exec): - dag.run(start_date=ex_date, end_date=ex_date, **run_kwargs) - except AirflowException: - pass - - # load dagrun - dr = DagRun.find(dag_id=dag_id, execution_date=ex_date, session=session) - dr = dr[0] - dr.dag = dag - - assert dr.state == dagrun_state - - # test tasks - for task_id, expected_state in expected_task_states.items(): - ti = dr.get_task_instance(task_id) - assert ti.state == expected_state - - def test_dagrun_fail(self): + @pytest.mark.parametrize( + "ti_states, run_state", + [ + (["failed", "success"], "failed"), + (["success", "success"], "success"), + ], + ) + def test_dagrun_state_correct(self, ti_states, run_state, dag_maker, session): """ DagRuns with one failed and one incomplete root task -> FAILED """ - # todo: AIP-78 remove along with DAG.run() - # this only tests the backfill job runner, not the scheduler - with pytest.warns(RemovedInAirflow3Warning): - self.evaluate_dagrun( - dag_id="test_dagrun_states_fail", - expected_task_states={ - "test_dagrun_fail": State.FAILED, - "test_dagrun_succeed": State.UPSTREAM_FAILED, - }, - dagrun_state=State.FAILED, - ) + with dag_maker(): - def test_dagrun_success(self): - """ - DagRuns with one failed and one successful root task -> SUCCESS - """ - # todo: AIP-78 remove along with DAG.run() - # this only tests the backfill job runner, not the scheduler - with pytest.warns(RemovedInAirflow3Warning): - self.evaluate_dagrun( - dag_id="test_dagrun_states_success", - expected_task_states={ - "test_dagrun_fail": State.FAILED, - "test_dagrun_succeed": State.SUCCESS, - }, - dagrun_state=State.SUCCESS, - ) + @task + def my_task(): ... - def test_dagrun_root_fail(self): - """ - DagRuns with one successful and one failed root task -> FAILED - """ - # todo: AIP-78 remove along with DAG.run() - # this only tests the backfill job runner, not the scheduler - with pytest.warns(RemovedInAirflow3Warning): - self.evaluate_dagrun( - dag_id="test_dagrun_states_root_fail", - expected_task_states={ - "test_dagrun_succeed": State.SUCCESS, - "test_dagrun_fail": State.FAILED, - }, - dagrun_state=State.FAILED, - ) + for _ in ti_states: + my_task() + dr = dag_maker.create_dagrun(state="running", triggered_by=DagRunTriggeredByType.TIMETABLE) + for idx, state in enumerate(ti_states): + dr.task_instances[idx].state = state + session.commit() + scheduler_job = Job(executor=self.null_exec) + self.job_runner = SchedulerJobRunner(job=scheduler_job) + self.job_runner.processor_agent = mock.MagicMock() + self.job_runner._do_scheduling(session) + assert session.query(DagRun).one().state == run_state def test_dagrun_root_after_dagrun_unfinished(self, mock_executor): """ @@ -2963,57 +2869,15 @@ def test_dagrun_root_after_dagrun_unfinished(self, mock_executor): dag.sync_to_db() scheduler_job = Job() - self.job_runner = SchedulerJobRunner(job=scheduler_job, num_runs=1, subdir=dag.fileloc) + self.job_runner = SchedulerJobRunner(job=scheduler_job, num_runs=2, subdir=dag.fileloc) run_job(scheduler_job, execute_callable=self.job_runner._execute) - first_run = DagRun.find(dag_id=dag_id, execution_date=DEFAULT_DATE)[0] + first_run = DagRun.find(dag_id=dag_id)[0] ti_ids = [(ti.task_id, ti.state) for ti in first_run.get_task_instances()] assert ti_ids == [("current", State.SUCCESS)] assert first_run.state in [State.SUCCESS, State.RUNNING] - def test_dagrun_deadlock_ignore_depends_on_past_advance_ex_date(self): - """ - DagRun is marked a success if ignore_first_depends_on_past=True - - Test that an otherwise-deadlocked dagrun is marked as a success - if ignore_first_depends_on_past=True and the dagrun execution_date - is after the start_date. - """ - # todo: AIP-78 remove along with DAG.run() - # this only tests the backfill job runner, not the scheduler - with pytest.warns(RemovedInAirflow3Warning): - self.evaluate_dagrun( - dag_id="test_dagrun_states_deadlock", - expected_task_states={ - "test_depends_on_past": State.SUCCESS, - "test_depends_on_past_2": State.SUCCESS, - }, - dagrun_state=State.SUCCESS, - advance_execution_date=True, - run_kwargs=dict(ignore_first_depends_on_past=True), - ) - - def test_dagrun_deadlock_ignore_depends_on_past(self): - """ - Test that ignore_first_depends_on_past doesn't affect results - (this is the same test as - test_dagrun_deadlock_ignore_depends_on_past_advance_ex_date except - that start_date == execution_date so depends_on_past is irrelevant). - """ - # todo: AIP-78 remove along with DAG.run() - # this only tests the backfill job runner, not the scheduler - with pytest.warns(RemovedInAirflow3Warning): - self.evaluate_dagrun( - dag_id="test_dagrun_states_deadlock", - expected_task_states={ - "test_depends_on_past": State.SUCCESS, - "test_depends_on_past_2": State.SUCCESS, - }, - dagrun_state=State.SUCCESS, - run_kwargs=dict(ignore_first_depends_on_past=True), - ) - @pytest.mark.parametrize( "configs", [ @@ -3136,9 +3000,14 @@ def test_scheduler_multiprocessing(self, configs): Test that the scheduler can successfully queue multiple dags in parallel """ with conf_vars(configs): - dag_ids = ["test_start_date_scheduling", "test_dagrun_states_success"] + dag_ids = [ + "test_start_date_scheduling", + "test_task_start_date_scheduling", + ] for dag_id in dag_ids: dag = self.dagbag.get_dag(dag_id) + if not dag: + raise ValueError(f"could not find dag {dag_id}") dag.clear() scheduler_job = Job( diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 997ef06329fd..67dc699fc3c8 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -25,7 +25,6 @@ import re import weakref from datetime import timedelta -from importlib import reload from pathlib import Path from typing import TYPE_CHECKING from unittest import mock @@ -45,12 +44,8 @@ AirflowException, DuplicateTaskIdFound, ParamValidationError, - RemovedInAirflow3Warning, UnknownExecutorException, ) -from airflow.executors import executor_loader -from airflow.executors.local_executor import LocalExecutor -from airflow.executors.sequential_executor import SequentialExecutor from airflow.models.asset import ( AssetAliasModel, AssetDagRunQueue, @@ -2740,20 +2735,6 @@ def test_asset_expression(self, session: Session) -> None: ] } - @mock.patch("airflow.models.dag.run_job") - def test_dag_executors(self, run_job_mock): - # todo: AIP-78 remove along with DAG.run() - # this only tests the backfill job runner, not the scheduler - with pytest.warns(RemovedInAirflow3Warning): - dag = DAG(dag_id="test", schedule=None) - reload(executor_loader) - with conf_vars({("core", "executor"): "SequentialExecutor"}): - dag.run() - assert isinstance(run_job_mock.call_args_list[0].kwargs["job"].executor, SequentialExecutor) - - dag.run(local=True) - assert isinstance(run_job_mock.call_args_list[1].kwargs["job"].executor, LocalExecutor) - class TestQueries: def setup_method(self) -> None: diff --git a/tests/models/test_xcom_arg.py b/tests/models/test_xcom_arg.py index b161020d1fb9..fbdd500661d2 100644 --- a/tests/models/test_xcom_arg.py +++ b/tests/models/test_xcom_arg.py @@ -23,7 +23,6 @@ from airflow.providers.standard.operators.bash import BashOperator from airflow.utils.types import NOTSET -from dev.tests_common.test_utils.config import conf_vars from dev.tests_common.test_utils.db import clear_db_dags, clear_db_runs pytestmark = pytest.mark.db_test @@ -146,7 +145,6 @@ def test_xcom_not_iterable(self, dag_maker): @pytest.mark.system("core") class TestXComArgRuntime: - @conf_vars({("core", "executor"): "DebugExecutor"}) def test_xcom_pass_to_op(self, dag_maker): with dag_maker(dag_id="test_xcom_pass_to_op") as dag: operator = PythonOperator( @@ -161,9 +159,8 @@ def test_xcom_pass_to_op(self, dag_maker): task_id="assert_is_value_1", ) operator >> operator2 - dag.run() + dag.test() - @conf_vars({("core", "executor"): "DebugExecutor"}) def test_xcom_push_and_pass(self, dag_maker): def push_xcom_value(key, value, **context): ti = context["task_instance"] @@ -182,7 +179,7 @@ def push_xcom_value(key, value, **context): op_args=[xarg], ) op1 >> op2 - dag.run() + dag.test() @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode