From 7aa844d63e60248cd3406f8142fd6cea852d1aff Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Sun, 13 Oct 2024 18:15:11 -0700 Subject: [PATCH] Remove BackfillJobRunner class (#42943) --- airflow/cli/cli_config.py | 2 +- airflow/exceptions.py | 25 - airflow/jobs/backfill_job_runner.py | 1106 --------- airflow/jobs/job.py | 4 +- airflow/models/dag.py | 3 +- airflow/models/dagpickle.py | 2 +- airflow/models/dagrun.py | 2 +- airflow/models/taskinstance.py | 2 +- airflow/ti_deps/dependencies_deps.py | 8 - airflow/ti_deps/dependencies_states.py | 9 - .../listeners.rst | 2 +- docs/apache-airflow/howto/listener-plugin.rst | 4 +- tests/core/test_impersonation_tests.py | 24 +- tests/jobs/test_backfill_job.py | 2094 ----------------- tests/jobs/test_scheduler_job.py | 29 +- 15 files changed, 29 insertions(+), 3287 deletions(-) delete mode 100644 airflow/jobs/backfill_job_runner.py delete mode 100644 tests/jobs/test_backfill_job.py diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py index e68a464f61b..8fe239c27ba 100644 --- a/airflow/cli/cli_config.py +++ b/airflow/cli/cli_config.py @@ -924,7 +924,7 @@ def string_lower_type(val): # jobs check ARG_JOB_TYPE_FILTER = Arg( ("--job-type",), - choices=("BackfillJob", "LocalTaskJob", "SchedulerJob", "TriggererJob", "DagProcessorJob"), + choices=("LocalTaskJob", "SchedulerJob", "TriggererJob", "DagProcessorJob"), action="store", help="The type of job(s) that will be checked.", ) diff --git a/airflow/exceptions.py b/airflow/exceptions.py index 55dd02fdae3..ccf62ca5e81 100644 --- a/airflow/exceptions.py +++ b/airflow/exceptions.py @@ -327,31 +327,6 @@ class PoolNotFound(AirflowNotFoundException): """Raise when a Pool is not available in the system.""" -class NoAvailablePoolSlot(AirflowException): - """Raise when there is not enough slots in pool.""" - - -class DagConcurrencyLimitReached(AirflowException): - """Raise when DAG max_active_tasks limit is reached.""" - - -class TaskConcurrencyLimitReached(AirflowException): - """Raise when task max_active_tasks limit is reached.""" - - -class BackfillUnfinished(AirflowException): - """ - Raises when not all tasks succeed in backfill. - - :param message: The human-readable description of the exception - :param ti_status: The information about all task statuses - """ - - def __init__(self, message, ti_status): - super().__init__(message) - self.ti_status = ti_status - - class FileSyntaxError(NamedTuple): """Information about a single error in a file.""" diff --git a/airflow/jobs/backfill_job_runner.py b/airflow/jobs/backfill_job_runner.py deleted file mode 100644 index 19dda4d6982..00000000000 --- a/airflow/jobs/backfill_job_runner.py +++ /dev/null @@ -1,1106 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -import time -from typing import TYPE_CHECKING, Any, Iterable, Iterator, Mapping, Sequence - -import attr -import pendulum -from sqlalchemy import case, or_, select, tuple_, update -from sqlalchemy.exc import OperationalError -from sqlalchemy.orm.session import make_transient -from tabulate import tabulate - -from airflow import models -from airflow.exceptions import ( - AirflowException, - BackfillUnfinished, - DagConcurrencyLimitReached, - NoAvailablePoolSlot, - PoolNotFound, - TaskConcurrencyLimitReached, - UnknownExecutorException, -) -from airflow.executors.executor_loader import ExecutorLoader -from airflow.jobs.base_job_runner import BaseJobRunner -from airflow.jobs.job import Job, perform_heartbeat -from airflow.models import DAG, DagPickle -from airflow.models.dagrun import DagRun -from airflow.models.taskinstance import TaskInstance -from airflow.ti_deps.dep_context import DepContext -from airflow.ti_deps.dependencies_deps import BACKFILL_QUEUED_DEPS -from airflow.timetables.base import DagRunInfo -from airflow.utils import helpers, timezone -from airflow.utils.configuration import tmp_configuration_copy -from airflow.utils.log.logging_mixin import LoggingMixin -from airflow.utils.session import NEW_SESSION, provide_session -from airflow.utils.state import DagRunState, State, TaskInstanceState -from airflow.utils.types import DagRunTriggeredByType, DagRunType - -if TYPE_CHECKING: - import datetime - - from sqlalchemy.orm.session import Session - - from airflow.executors.base_executor import BaseExecutor - from airflow.models.abstractoperator import AbstractOperator - from airflow.models.taskinstance import TaskInstanceKey - - -class BackfillJobRunner(BaseJobRunner, LoggingMixin): - """ - A backfill job runner consists of a dag for a specific time range. - - It triggers a set of task instance runs, in the right order and lasts for - as long as it takes for the set of task instance to be completed. - """ - - job_type = "BackfillJob" - - STATES_COUNT_AS_RUNNING = (TaskInstanceState.RUNNING, TaskInstanceState.QUEUED) - - @attr.define - class _DagRunTaskStatus: - """ - Internal status of the backfill job. - - This class is intended to be instantiated only within a BackfillJobRunner - instance and will track the execution of tasks, e.g. running, skipped, - succeeded, failed, etc. Information about the dag runs related to the - backfill job are also being tracked in this structure, e.g. finished runs, etc. - Any other status related information related to the execution of dag runs / tasks - can be included in this structure since it makes it easier to pass it around. - - :param to_run: Tasks to run in the backfill - :param running: Maps running task instance key to task instance object - :param skipped: Tasks that have been skipped - :param succeeded: Tasks that have succeeded so far - :param failed: Tasks that have failed - :param not_ready: Tasks not ready for execution - :param deadlocked: Deadlocked tasks - :param active_runs: Active dag runs at a certain point in time - :param executed_dag_run_dates: Datetime objects for the executed dag runs - :param finished_runs: Number of finished runs so far - :param total_runs: Number of total dag runs able to run - """ - - to_run: dict[TaskInstanceKey, TaskInstance] = attr.ib(factory=dict) - running: dict[TaskInstanceKey, TaskInstance] = attr.ib(factory=dict) - skipped: set[TaskInstanceKey] = attr.ib(factory=set) - succeeded: set[TaskInstanceKey] = attr.ib(factory=set) - failed: set[TaskInstanceKey] = attr.ib(factory=set) - not_ready: set[TaskInstanceKey] = attr.ib(factory=set) - deadlocked: set[TaskInstance] = attr.ib(factory=set) - active_runs: set[DagRun] = attr.ib(factory=set) - executed_dag_run_dates: set[pendulum.DateTime] = attr.ib(factory=set) - finished_runs: int = 0 - total_runs: int = 0 - - def __init__( - self, - job: Job, - dag: DAG, - start_date=None, - end_date=None, - mark_success=False, - donot_pickle=False, - ignore_first_depends_on_past=False, - ignore_task_deps=False, - pool=None, - delay_on_limit_secs=1.0, - verbose=False, - conf=None, - rerun_failed_tasks=False, - run_backwards=False, - run_at_least_once=False, - continue_on_failures=False, - disable_retry=False, - ) -> None: - """ - Create a BackfillJobRunner. - - :param dag: DAG object. - :param start_date: start date for the backfill date range. - :param end_date: end date for the backfill date range. - :param mark_success: flag whether to mark the task auto success. - :param donot_pickle: whether pickle - :param ignore_first_depends_on_past: whether to ignore depend on past - :param ignore_task_deps: whether to ignore the task dependency - :param pool: pool to backfill - :param delay_on_limit_secs: - :param verbose: - :param conf: a dictionary which user could pass k-v pairs for backfill - :param rerun_failed_tasks: flag to whether to - auto rerun the failed task in backfill - :param run_backwards: Whether to process the dates from most to least recent - :param run_at_least_once: If true, always run the DAG at least once even - if no logical run exists within the time range. - :param args: - :param kwargs: - """ - super().__init__(job) - self.dag = dag - self.dag_id = dag.dag_id - self.bf_start_date = start_date - self.bf_end_date = end_date - self.mark_success = mark_success - self.donot_pickle = donot_pickle - self.ignore_first_depends_on_past = ignore_first_depends_on_past - self.ignore_task_deps = ignore_task_deps - self.pool = pool - self.delay_on_limit_secs = delay_on_limit_secs - self.verbose = verbose - self.conf = conf - self.rerun_failed_tasks = rerun_failed_tasks - self.run_backwards = run_backwards - self.run_at_least_once = run_at_least_once - self.continue_on_failures = continue_on_failures - self.disable_retry = disable_retry - - def _update_counters(self, ti_status: _DagRunTaskStatus, session: Session) -> None: - """ - Update the counters per state of the tasks that were running. - - Can re-add to tasks to run when required. - - :param ti_status: the internal status of the backfill job tasks - """ - tis_to_be_scheduled = [] - refreshed_tis = [] - TI = TaskInstance - - ti_primary_key_to_ti_key = {ti_key.primary: ti_key for ti_key in ti_status.running.keys()} - - filter_for_tis = TI.filter_for_tis(list(ti_status.running.values())) - if filter_for_tis is not None: - refreshed_tis = session.scalars(select(TI).where(filter_for_tis)).all() - - for ti in refreshed_tis: - # Use primary key to match in memory information - ti_key = ti_primary_key_to_ti_key[ti.key.primary] - if ti.state == TaskInstanceState.SUCCESS: - ti_status.succeeded.add(ti_key) - self.log.debug("Task instance %s succeeded. Don't rerun.", ti) - ti_status.running.pop(ti_key) - continue - if ti.state == TaskInstanceState.SKIPPED: - ti_status.skipped.add(ti_key) - self.log.debug("Task instance %s skipped. Don't rerun.", ti) - ti_status.running.pop(ti_key) - continue - if ti.state == TaskInstanceState.FAILED: - self.log.error("Task instance %s failed", ti) - ti_status.failed.add(ti_key) - ti_status.running.pop(ti_key) - continue - # special case: if the task needs to run again put it back - if ti.state == TaskInstanceState.UP_FOR_RETRY: - self.log.warning("Task instance %s is up for retry", ti) - ti_status.running.pop(ti_key) - ti_status.to_run[ti.key] = ti - # special case: if the task needs to be rescheduled put it back - elif ti.state == TaskInstanceState.UP_FOR_RESCHEDULE: - self.log.warning("Task instance %s is up for reschedule", ti) - ti_status.running.pop(ti_key) - ti_status.to_run[ti.key] = ti - # special case: The state of the task can be set to NONE by the task itself - # when it reaches concurrency limits. It could also happen when the state - # is changed externally, e.g. by clearing tasks from the ui. We need to cover - # for that as otherwise those tasks would fall outside the scope of - # the backfill suddenly. - elif ti.state is None: - self.log.warning( - "FIXME: task instance %s state was set to none externally or " - "reaching concurrency limits. Re-adding task to queue.", - ti, - ) - tis_to_be_scheduled.append(ti) - ti_status.running.pop(ti_key) - ti_status.to_run[ti.key] = ti - # special case: Deferrable task can go from DEFERRED to SCHEDULED; - # when that happens, we need to put it back as in UP_FOR_RESCHEDULE - elif ti.state == TaskInstanceState.SCHEDULED: - self.log.debug("Task instance %s is resumed from deferred state", ti) - ti_status.running.pop(ti_key) - ti_status.to_run[ti.key] = ti - - # Batch schedule of task instances - if tis_to_be_scheduled: - filter_for_tis = TI.filter_for_tis(tis_to_be_scheduled) - session.execute( - update(TI) - .where(filter_for_tis) - .values( - state=TaskInstanceState.SCHEDULED, - try_number=case( - ( - or_(TI.state.is_(None), TI.state != TaskInstanceState.UP_FOR_RESCHEDULE), - TI.try_number + 1, - ), - else_=TI.try_number, - ), - ) - .execution_options(synchronize_session=False) - ) - session.flush() - - def _manage_executor_state( - self, - running: Mapping[TaskInstanceKey, TaskInstance], - executor: BaseExecutor, - session: Session, - ) -> Iterator[tuple[AbstractOperator, str, Sequence[TaskInstance], int]]: - """ - Compare task instances' states with that of the executor. - - Expands downstream mapped tasks when necessary. - - :param running: dict of key, task to verify - :return: An iterable of expanded TaskInstance per MappedTask - """ - # list of tuples (dag_id, task_id, execution_date, map_index) of running tasks in executor - buffered_events = list(executor.get_event_buffer().items()) - running_tis_ids = [ - (key.dag_id, key.task_id, key.run_id, key.map_index) - for key, _ in buffered_events - if key in running - ] - # list of TaskInstance of running tasks in executor (refreshed from db in batch) - refreshed_running_tis = session.scalars( - select(TaskInstance).where( - tuple_( - TaskInstance.dag_id, - TaskInstance.task_id, - TaskInstance.run_id, - TaskInstance.map_index, - ).in_(running_tis_ids) - ) - ).all() - # dict of refreshed TaskInstance by key to easily find them - running_dict = {(ti.dag_id, ti.task_id, ti.run_id, ti.map_index): ti for ti in refreshed_running_tis} - need_refresh = False - - for key, value in buffered_events: - state, info = value - ti_key = (key.dag_id, key.task_id, key.run_id, key.map_index) - if ti_key not in running_dict: - self.log.warning("%s state %s not in running=%s", key, state, running.values()) - continue - - ti = running_dict[ti_key] - if need_refresh: - ti.refresh_from_db(session=session) - - self.log.debug("Executor state: %s task %s", state, ti) - - if ( - state in (TaskInstanceState.FAILED, TaskInstanceState.SUCCESS) - and ti.state in self.STATES_COUNT_AS_RUNNING - ): - msg = ( - f"The executor reported that the task instance {ti} finished with state {state}, " - f"but the task instance's state attribute is {ti.state}. " - "Learn more: https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally" - ) - if info is not None: - msg += f" Extra info: {info}" - self.log.error(msg) - ti.handle_failure(error=msg) - continue - - def _iter_task_needing_expansion() -> Iterator[AbstractOperator]: - from airflow.models.mappedoperator import AbstractOperator - - for node in self.dag.get_task(ti.task_id).iter_mapped_dependants(): - if isinstance(node, AbstractOperator): - yield node - else: # A (mapped) task group. All its children need expansion. - yield from node.iter_tasks() - - if ti.state not in self.STATES_COUNT_AS_RUNNING: - # Don't use ti.task; if this task is mapped, that attribute - # would hold the unmapped task. We need to original task here. - for node in _iter_task_needing_expansion(): - new_tis, num_mapped_tis = node.expand_mapped_task(ti.run_id, session=session) - yield node, ti.run_id, new_tis, num_mapped_tis - - @provide_session - def _get_dag_run( - self, - dagrun_info: DagRunInfo, - dag: DAG, - session: Session = NEW_SESSION, - ) -> DagRun | None: - """ - Return an existing dag run for the given run date or create one. - - If the max_active_runs limit is reached, this function will return None. - - :param dagrun_info: Schedule information for the dag run - :param dag: DAG - :param session: the database session object - :return: a DagRun in state RUNNING or None - """ - run_date = dagrun_info.logical_date - - respect_dag_max_active_limit = bool(dag.timetable.can_be_scheduled) - - current_active_dag_count = dag.get_num_active_runs(external_trigger=False) - - # check if we are scheduling on top of an already existing DAG run - # we could find a "scheduled" run instead of a "backfill" - runs = DagRun.find(dag_id=dag.dag_id, execution_date=run_date, session=session) - run: DagRun | None - if runs: - run = runs[0] - if run.state == DagRunState.RUNNING: - respect_dag_max_active_limit = False - # Fixes --conf overwrite for backfills with already existing DagRuns - run.conf = self.conf or {} - # start_date is cleared for existing DagRuns - run.start_date = timezone.utcnow() - else: - run = None - - # enforce max_active_runs limit for dag, special cases already - # handled by respect_dag_max_active_limit - if respect_dag_max_active_limit and current_active_dag_count >= dag.max_active_runs: - return None - - run = run or dag.create_dagrun( - execution_date=run_date, - data_interval=dagrun_info.data_interval, - start_date=timezone.utcnow(), - state=DagRunState.RUNNING, - external_trigger=False, - session=session, - conf=self.conf, - run_type=DagRunType.BACKFILL_JOB, - creating_job_id=self.job.id, - triggered_by=DagRunTriggeredByType.TIMETABLE, - ) - - # set required transient field - run.dag = dag - - # explicitly mark as backfill and running - run.state = DagRunState.RUNNING - run.run_type = DagRunType.BACKFILL_JOB - run.verify_integrity(session=session) - - run.notify_dagrun_state_changed(msg="started") - return run - - @provide_session - def _task_instances_for_dag_run( - self, - dag: DAG, - dag_run: DagRun, - session: Session = NEW_SESSION, - ) -> dict[TaskInstanceKey, TaskInstance]: - """ - Return a map of task instance keys to task instance objects for the given dag run. - - :param dag_run: the dag run to get the tasks from - :param session: the database session object - """ - tasks_to_run = {} - - if dag_run is None: - return tasks_to_run - - # check if we have orphaned tasks - self.reset_state_for_orphaned_tasks(filter_by_dag_run=dag_run, session=session) - - # for some reason if we don't refresh the reference to run is lost - dag_run.refresh_from_db(session=session) - make_transient(dag_run) - - dag_run.dag = dag - info = dag_run.task_instance_scheduling_decisions(session=session) - schedulable_tis = info.schedulable_tis - try: - for ti in dag_run.get_task_instances(session=session): - if ti in schedulable_tis: - if ti.state != TaskInstanceState.UP_FOR_RESCHEDULE: - ti.try_number += 1 - ti.set_state(TaskInstanceState.SCHEDULED) - if ti.state != TaskInstanceState.REMOVED: - tasks_to_run[ti.key] = ti - session.commit() - except Exception: - session.rollback() - raise - return tasks_to_run - - def _log_progress(self, ti_status: _DagRunTaskStatus) -> None: - self.log.info( - "[backfill progress] | finished run %s of %s | tasks waiting: %s | succeeded: %s | " - "running: %s | failed: %s | skipped: %s | deadlocked: %s | not ready: %s", - ti_status.finished_runs, - ti_status.total_runs, - len(ti_status.to_run), - len(ti_status.succeeded), - len(ti_status.running), - len(ti_status.failed), - len(ti_status.skipped), - len(ti_status.deadlocked), - len(ti_status.not_ready), - ) - - self.log.debug("Finished dag run loop iteration. Remaining tasks %s", ti_status.to_run.values()) - - def _process_backfill_task_instances( - self, - ti_status: _DagRunTaskStatus, - pickle_id: int | None, - start_date: datetime.datetime | None = None, - *, - session: Session, - ) -> list: - """ - Process a set of task instances from a set of DAG runs. - - Special handling is done to account for different task instance states - that could be present when running them in a backfill process. - - :param ti_status: the internal status of the job - :param executor: the executor to run the task instances - :param pickle_id: the pickle_id if dag is pickled, None otherwise - :param start_date: the start date of the backfill job - :param session: the current session object - :return: the list of execution_dates for the finished dag runs - """ - executed_run_dates = [] - - while (ti_status.to_run or ti_status.running) and not ti_status.deadlocked: - self.log.debug("Clearing out not_ready list") - ti_status.not_ready.clear() - - # we need to execute the tasks bottom to top - # or leaf to root, as otherwise tasks might be - # determined deadlocked while they are actually - # waiting for their upstream to finish - def _per_task_process(key, ti: TaskInstance, session): - ti.refresh_from_db(lock_for_update=True, session=session) - - task = self.dag.get_task(ti.task_id) - ti.task = task - - self.log.debug("Task instance to run %s state %s", ti, ti.state) - - # The task was already marked successful or skipped by a - # different Job. Don't rerun it. - if ti.state == TaskInstanceState.SUCCESS: - ti_status.succeeded.add(key) - self.log.debug("Task instance %s succeeded. Don't rerun.", ti) - ti_status.to_run.pop(key) - if key in ti_status.running: - ti_status.running.pop(key) - return - elif ti.state == TaskInstanceState.SKIPPED: - ti_status.skipped.add(key) - self.log.debug("Task instance %s skipped. Don't rerun.", ti) - ti_status.to_run.pop(key) - if key in ti_status.running: - ti_status.running.pop(key) - return - - if self.rerun_failed_tasks: - # Rerun failed tasks or upstreamed failed tasks - if ti.state in (TaskInstanceState.FAILED, TaskInstanceState.UPSTREAM_FAILED): - self.log.error("Task instance %s with state %s", ti, ti.state) - if key in ti_status.running: - ti_status.running.pop(key) - # Reset the failed task in backfill to scheduled state - ti.try_number += 1 - ti.set_state(TaskInstanceState.SCHEDULED, session=session) - if ti.dag_run not in ti_status.active_runs: - ti_status.active_runs.add(ti.dag_run) - else: - # Default behaviour which works for subdag. - if ti.state in (TaskInstanceState.FAILED, TaskInstanceState.UPSTREAM_FAILED): - self.log.error("Task instance %s with state %s", ti, ti.state) - ti_status.failed.add(key) - ti_status.to_run.pop(key) - if key in ti_status.running: - ti_status.running.pop(key) - return - - if self.ignore_first_depends_on_past: - dagrun = ti.get_dagrun(session=session) - ignore_depends_on_past = dagrun.execution_date == (start_date or ti.start_date) - else: - ignore_depends_on_past = False - - backfill_context = DepContext( - deps=BACKFILL_QUEUED_DEPS, - ignore_depends_on_past=ignore_depends_on_past, - ignore_task_deps=self.ignore_task_deps, - wait_for_past_depends_before_skipping=False, - flag_upstream_failed=True, - ) - - executor = ExecutorLoader.load_executor(str(ti.executor) if ti.executor else None) - # Is the task runnable? -- then run it - # the dependency checker can change states of tis - if ti.are_dependencies_met( - dep_context=backfill_context, session=session, verbose=self.verbose - ): - if executor.has_task(ti): - self.log.debug("Task Instance %s already in executor waiting for queue to clear", ti) - else: - self.log.debug("Sending %s to executor", ti) - # Skip scheduled state, we are executing immediately - if ti.state in (TaskInstanceState.UP_FOR_RETRY, None): - # i am not sure why this is necessary. - # seemingly a quirk of backfill runner. - # it should be handled elsewhere i think. - # seems the leaf tasks are set SCHEDULED but others not. - # but i am not going to look too closely since we need - # to nuke the current backfill approach anyway. - ti.try_number += 1 - ti.state = TaskInstanceState.QUEUED - ti.queued_by_job_id = self.job.id - ti.queued_dttm = timezone.utcnow() - session.merge(ti) - try: - session.commit() - except OperationalError: - self.log.exception("Failed to commit task state change due to operational error") - session.rollback() - # early exit so the outer loop can retry - return - - cfg_path = None - - if executor.is_local: - cfg_path = tmp_configuration_copy() - - executor.queue_task_instance( - ti, - mark_success=self.mark_success, - pickle_id=pickle_id, - ignore_task_deps=self.ignore_task_deps, - ignore_depends_on_past=ignore_depends_on_past, - wait_for_past_depends_before_skipping=False, - pool=self.pool, - cfg_path=cfg_path, - ) - ti_status.running[key] = ti - ti_status.to_run.pop(key) - return - - if ti.state == TaskInstanceState.UPSTREAM_FAILED: - self.log.error("Task instance %s upstream failed", ti) - ti_status.failed.add(key) - ti_status.to_run.pop(key) - if key in ti_status.running: - ti_status.running.pop(key) - return - - # special case - if ti.state == TaskInstanceState.UP_FOR_RETRY: - self.log.debug("Task instance %s retry period not expired yet", ti) - if key in ti_status.running: - ti_status.running.pop(key) - ti_status.to_run[key] = ti - return - - # special case - if ti.state == TaskInstanceState.UP_FOR_RESCHEDULE: - self.log.debug("Task instance %s reschedule period not expired yet", ti) - if key in ti_status.running: - ti_status.running.pop(key) - ti_status.to_run[key] = ti - return - - # all remaining tasks - self.log.debug("Adding %s to not_ready", ti) - ti_status.not_ready.add(key) - - try: - for task in self.dag.topological_sort(): - for key, ti in list(ti_status.to_run.items()): - # Attempt to workaround deadlock on backfill by attempting to commit the transaction - # state update few times before giving up - max_attempts = 5 - for i in range(max_attempts): - if task.task_id != ti.task_id: - continue - - pool = session.scalar( - select(models.Pool).where(models.Pool.pool == task.pool).limit(1) - ) - if not pool: - raise PoolNotFound(f"Unknown pool: {task.pool}") - - open_slots = pool.open_slots(session=session) - if open_slots <= 0: - raise NoAvailablePoolSlot( - f"Not scheduling since there are {open_slots} " - f"open slots in pool {task.pool}" - ) - - num_running_task_instances_in_dag = DAG.get_num_task_instances( - self.dag_id, - states=self.STATES_COUNT_AS_RUNNING, - session=session, - ) - - if num_running_task_instances_in_dag >= self.dag.max_active_tasks: - raise DagConcurrencyLimitReached( - "Not scheduling since DAG max_active_tasks limit is reached." - ) - - if task.max_active_tis_per_dag is not None: - num_running_task_instances_in_task = DAG.get_num_task_instances( - dag_id=self.dag_id, - task_ids=[task.task_id], - states=self.STATES_COUNT_AS_RUNNING, - session=session, - ) - - if num_running_task_instances_in_task >= task.max_active_tis_per_dag: - raise TaskConcurrencyLimitReached( - "Not scheduling since Task concurrency limit is reached." - ) - - if task.max_active_tis_per_dagrun is not None: - num_running_task_instances_in_task_dagrun = DAG.get_num_task_instances( - dag_id=self.dag_id, - run_id=ti.run_id, - task_ids=[task.task_id], - states=self.STATES_COUNT_AS_RUNNING, - session=session, - ) - - if ( - num_running_task_instances_in_task_dagrun - >= task.max_active_tis_per_dagrun - ): - raise TaskConcurrencyLimitReached( - "Not scheduling since Task concurrency per DAG run limit is reached." - ) - - _per_task_process(key, ti, session) - try: - session.commit() - except OperationalError: - self.log.exception( - "Failed to commit task state due to operational error. " - "The job will retry this operation so if your backfill succeeds, " - "you can safely ignore this message.", - ) - session.rollback() - if i == max_attempts - 1: - raise - # retry the loop - else: - # break the retry loop - break - except (NoAvailablePoolSlot, DagConcurrencyLimitReached, TaskConcurrencyLimitReached) as e: - self.log.debug(e) - - perform_heartbeat( - job=self.job, - heartbeat_callback=self.heartbeat_callback, - only_if_necessary=True, - ) - # execute the tasks in the queue - for executor in self.job.executors: - executor.heartbeat() - - # If the set of tasks that aren't ready ever equals the set of - # tasks to run and there are no running tasks then the backfill - # is deadlocked - if ti_status.not_ready and ti_status.not_ready == set(ti_status.to_run) and not ti_status.running: - self.log.warning("Deadlock discovered for ti_status.to_run=%s", ti_status.to_run.values()) - ti_status.deadlocked.update(ti_status.to_run.values()) - ti_status.to_run.clear() - - for executor in self.job.executors: - # check executor state -- and expand any mapped TIs - for node, run_id, new_mapped_tis, max_map_index in self._manage_executor_state( - ti_status.running, executor, session - ): - - def to_keep(key: TaskInstanceKey) -> bool: - if key.dag_id != node.dag_id or key.task_id != node.task_id or key.run_id != run_id: - # For another Dag/Task/Run -- don't remove - return True - return 0 <= key.map_index <= max_map_index - - # remove the old unmapped TIs for node -- they have been replaced with the mapped TIs - ti_status.to_run = {key: ti for (key, ti) in ti_status.to_run.items() if to_keep(key)} - - ti_status.to_run.update({ti.key: ti for ti in new_mapped_tis}) - - for new_ti in new_mapped_tis: - new_ti.try_number += 1 - new_ti.set_state(TaskInstanceState.SCHEDULED, session=session) - - # Set state to failed for running TIs that are set up for retry if disable-retry flag is set - for ti in ti_status.running.values(): - if self.disable_retry and ti.state == TaskInstanceState.UP_FOR_RETRY: - ti.set_state(TaskInstanceState.FAILED, session=session) - - # update the task counters - self._update_counters(ti_status=ti_status, session=session) - session.commit() - - # update dag run state - _dag_runs = ti_status.active_runs.copy() - for run in _dag_runs: - run.update_state(session=session) - if run.state in State.finished_dr_states: - ti_status.finished_runs += 1 - ti_status.active_runs.remove(run) - executed_run_dates.append(run.execution_date) - - self._log_progress(ti_status) - session.commit() - time.sleep(1) - - # return updated status - return executed_run_dates - - @provide_session - def _collect_errors(self, ti_status: _DagRunTaskStatus, session: Session = NEW_SESSION) -> Iterator[str]: - def tabulate_ti_keys_set(ti_keys: Iterable[TaskInstanceKey]) -> str: - # Sorting by execution date first - sorted_ti_keys: Any = sorted( - ti_keys, - key=lambda ti_key: ( - ti_key.run_id, - ti_key.dag_id, - ti_key.task_id, - ti_key.map_index, - ti_key.try_number, - ), - ) - - if all(key.map_index == -1 for key in ti_keys): - headers = ["DAG ID", "Task ID", "Run ID", "Try number"] - sorted_ti_keys = (k[0:4] for k in sorted_ti_keys) - else: - headers = ["DAG ID", "Task ID", "Run ID", "Map Index", "Try number"] - - return tabulate(sorted_ti_keys, headers=headers) - - if ti_status.failed: - yield "Some task instances failed:\n" - yield tabulate_ti_keys_set(ti_status.failed) - if ti_status.deadlocked: - yield "BackfillJob is deadlocked." - deadlocked_depends_on_past = any( - t.are_dependencies_met( - dep_context=DepContext(ignore_depends_on_past=False), - session=session, - verbose=self.verbose, - ) - != t.are_dependencies_met( - dep_context=DepContext(ignore_depends_on_past=True), session=session, verbose=self.verbose - ) - for t in ti_status.deadlocked - ) - if deadlocked_depends_on_past: - yield ( - "Some of the deadlocked tasks were unable to run because " - 'of "depends_on_past" relationships. Try running the ' - "backfill with the option " - '"ignore_first_depends_on_past=True" or passing "-I" at ' - "the command line." - ) - yield "\nThese tasks have succeeded:\n" - yield tabulate_ti_keys_set(ti_status.succeeded) - yield "\n\nThese tasks are running:\n" - yield tabulate_ti_keys_set(ti_status.running) - yield "\n\nThese tasks have failed:\n" - yield tabulate_ti_keys_set(ti_status.failed) - yield "\n\nThese tasks are skipped:\n" - yield tabulate_ti_keys_set(ti_status.skipped) - yield "\n\nThese tasks are deadlocked:\n" - yield tabulate_ti_keys_set([ti.key for ti in ti_status.deadlocked]) - - @provide_session - def _execute_dagruns( - self, - dagrun_infos: Iterable[DagRunInfo], - ti_status: _DagRunTaskStatus, - pickle_id: int | None, - start_date: datetime.datetime | None, - session: Session = NEW_SESSION, - ) -> None: - """ - Compute and execute dag runs and their respective task instances for the given dates. - - Returns a list of execution dates of the dag runs that were executed. - - :param dagrun_infos: Schedule information for dag runs - :param ti_status: internal BackfillJobRunner status structure to tis track progress - :param pickle_id: numeric id of the pickled dag, None if not pickled - :param start_date: backfill start date - :param session: the current session object - """ - for dagrun_info in dagrun_infos: - dag_run = self._get_dag_run(dagrun_info, self.dag, session=session) - if dag_run is not None: - tis_map = self._task_instances_for_dag_run(self.dag, dag_run, session=session) - ti_status.active_runs.add(dag_run) - ti_status.to_run.update(tis_map or {}) - - tis_missing_executor = [] - for ti in ti_status.to_run.values(): - if ti.executor: - try: - ExecutorLoader.lookup_executor_name_by_str(ti.executor) - except UnknownExecutorException: - tis_missing_executor.append(ti) - - if tis_missing_executor: - raise UnknownExecutorException( - "The following task instances are configured to use an executor that is not present. " - "Review the core.executors Airflow configuration to add it or clear the task instance to " - "clear the executor configuration for this task.\n" - + "\n".join( - [f" {ti.task_id}: {ti.run_id} (executor: {ti.executor})" for ti in tis_missing_executor] - ) - ) - processed_dag_run_dates = self._process_backfill_task_instances( - ti_status=ti_status, - pickle_id=pickle_id, - start_date=start_date, - session=session, - ) - - ti_status.executed_dag_run_dates.update(processed_dag_run_dates) - - @provide_session - def _set_unfinished_dag_runs_to_failed( - self, - dag_runs: Iterable[DagRun], - session: Session = NEW_SESSION, - ) -> None: - """ - Update the state of each dagrun based on the task_instance state and set unfinished runs to failed. - - :param dag_runs: DAG runs - :param session: session - :return: None - """ - for dag_run in dag_runs: - dag_run.update_state() - if dag_run.state not in State.finished_dr_states: - dag_run.set_state(DagRunState.FAILED) - session.merge(dag_run) - - @provide_session - def _execute(self, session: Session = NEW_SESSION) -> None: - """ - Initialize all required components of a dag for a specified date range and execute the tasks. - - :meta private: - """ - ti_status = BackfillJobRunner._DagRunTaskStatus() - - start_date = self.bf_start_date - - # Get DagRun schedule between the start/end dates, which will turn into dag runs. - dagrun_start_date = timezone.coerce_datetime(start_date) - if self.bf_end_date is None: - dagrun_end_date = pendulum.now(timezone.utc) - else: - dagrun_end_date = pendulum.instance(self.bf_end_date) - dagrun_infos = list(self.dag.iter_dagrun_infos_between(dagrun_start_date, dagrun_end_date)) - if self.run_backwards: - tasks_that_depend_on_past = [t.task_id for t in self.dag.task_dict.values() if t.depends_on_past] - if tasks_that_depend_on_past: - raise AirflowException( - f"You cannot backfill backwards because one or more " - f'tasks depend_on_past: {",".join(tasks_that_depend_on_past)}' - ) - dagrun_infos = dagrun_infos[::-1] - - if not dagrun_infos: - if not self.run_at_least_once: - self.log.info("No run dates were found for the given dates and dag interval.") - return - dagrun_infos = [DagRunInfo.interval(dagrun_start_date, dagrun_end_date)] - - running_dagruns = DagRun.find( - dag_id=self.dag.dag_id, - execution_start_date=self.bf_start_date, - execution_end_date=self.bf_end_date, - no_backfills=True, - state=DagRunState.RUNNING, - ) - - if running_dagruns: - for run in running_dagruns: - self.log.error( - "Backfill cannot be created for DagRun %s in %s, as there's already %s in a RUNNING " - "state.", - run.run_id, - run.execution_date.strftime("%Y-%m-%dT%H:%M:%S"), - run.run_type, - ) - self.log.error( - "Changing DagRun into BACKFILL would cause scheduler to lose track of executing " - "tasks. Not changing DagRun type into BACKFILL, and trying insert another DagRun into " - "database would cause database constraint violation for dag_id + execution_date " - "combination. Please adjust backfill dates or wait for this DagRun to finish.", - ) - return - pickle_id = None - - _support_pickling = [] - - for executor in self.job.executors: - _support_pickling.append(executor.supports_pickling) - - executor.job_id = self.job.id - executor.start() - - if not self.donot_pickle and all(_support_pickling): - pickle = DagPickle(self.dag) - session.add(pickle) - session.commit() - pickle_id = pickle.id - - ti_status.total_runs = len(dagrun_infos) # total dag runs in backfill - - try: - remaining_dates = ti_status.total_runs - while remaining_dates > 0: - dagrun_infos_to_process = [ - dagrun_info - for dagrun_info in dagrun_infos - if dagrun_info.logical_date not in ti_status.executed_dag_run_dates - ] - self._execute_dagruns( - dagrun_infos=dagrun_infos_to_process, - ti_status=ti_status, - pickle_id=pickle_id, - start_date=start_date, - session=session, - ) - - remaining_dates = ti_status.total_runs - len(ti_status.executed_dag_run_dates) - err = "".join(self._collect_errors(ti_status=ti_status, session=session)) - if err: - if not self.continue_on_failures or ti_status.deadlocked: - raise BackfillUnfinished(err, ti_status) - - if remaining_dates > 0: - self.log.info( - "max_active_runs limit for dag %s has been reached " - " - waiting for other dag runs to finish", - self.dag_id, - ) - time.sleep(self.delay_on_limit_secs) - except (KeyboardInterrupt, SystemExit): - self.log.warning("Backfill terminated by user.") - - # TODO: we will need to terminate running task instances and set the - # state to failed. - self._set_unfinished_dag_runs_to_failed(ti_status.active_runs) - except OperationalError: - self.log.exception( - "Backfill job dead-locked. The job will retry the job so it is likely " - "to heal itself. If your backfill succeeds you can ignore this exception.", - ) - raise - finally: - session.commit() - for executor in self.job.executors: - executor.end() - - self.log.info("Backfill done for DAG %s. Exiting.", self.dag) - - @provide_session - def reset_state_for_orphaned_tasks( - self, - filter_by_dag_run: DagRun | None = None, - session: Session = NEW_SESSION, - ) -> int | None: - """ - Reset state of orphaned tasks. - - This function checks if there are any tasks in the dagrun (or all) that - have a schedule or queued states but are not known by the executor. If - it finds those it will reset the state to None so they will get picked - up again. The batch option is for performance reasons as the queries - are made in sequence. - - :param filter_by_dag_run: the dag_run we want to process, None if all - :return: the number of TIs reset - """ - queued_tis = [] - running_tis = [] - for executor in self.job.executors: - queued_tis.append(executor.queued_tasks) - # also consider running as the state might not have changed in the db yet - running_tis.append(executor.running) - - # Can't use an update here since it doesn't support joins. - resettable_states = [TaskInstanceState.SCHEDULED, TaskInstanceState.QUEUED] - if filter_by_dag_run is None: - resettable_tis = ( - session.scalars( - select(TaskInstance) - .join(TaskInstance.dag_run) - .where( - DagRun.state == DagRunState.RUNNING, - DagRun.run_type != DagRunType.BACKFILL_JOB, - TaskInstance.state.in_(resettable_states), - ) - ) - ).all() - else: - resettable_tis = filter_by_dag_run.get_task_instances(state=resettable_states, session=session) - - tis_to_reset = [ti for ti in resettable_tis if ti.key not in queued_tis and ti.key not in running_tis] - if not tis_to_reset: - return 0 - - def query(result, items): - if not items: - return result - - filter_for_tis = TaskInstance.filter_for_tis(items) - reset_tis = session.scalars( - select(TaskInstance) - .where(filter_for_tis, TaskInstance.state.in_(resettable_states)) - .with_for_update() - ).all() - - for ti in reset_tis: - ti.state = None - session.merge(ti) - - return result + reset_tis - - reset_tis = helpers.reduce_in_chunks(query, tis_to_reset, [], self.job.max_tis_per_query) - - task_instance_str = "\n".join(f"\t{x!r}" for x in reset_tis) - session.flush() - - self.log.info("Reset the following %s TaskInstances:\n%s", len(reset_tis), task_instance_str) - return len(reset_tis) diff --git a/airflow/jobs/job.py b/airflow/jobs/job.py index 03bf92d4e3d..0c2db219ef9 100644 --- a/airflow/jobs/job.py +++ b/airflow/jobs/job.py @@ -77,8 +77,6 @@ class Job(Base, LoggingMixin): The ORM class representing Job stored in the database. Jobs are processing items with state and duration that aren't task instances. - For instance a BackfillJob is a collection of task instance runs, - but should have its own state, start and end time. """ __tablename__ = "job" @@ -117,7 +115,7 @@ class Job(Base, LoggingMixin): """ TaskInstances which have been enqueued by this Job. - Only makes sense for SchedulerJob and BackfillJob instances. + Only makes sense for SchedulerJob. """ def __init__(self, executor: BaseExecutor | None = None, heartrate=None, **kwargs): diff --git a/airflow/models/dag.py b/airflow/models/dag.py index f5def92ea92..1dfb3b2e911 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -2381,8 +2381,7 @@ def add_logger_if_needed(ti: TaskInstance): tasks = self.task_dict self.log.debug("starting dagrun") # Instead of starting a scheduler, we run the minimal loop possible to check - # for task readiness and dependency management. This is notably faster - # than creating a BackfillJob and allows us to surface logs to the user + # for task readiness and dependency management. # ``Dag.test()`` works in two different modes depending on ``use_executor``: # - if ``use_executor`` is False, runs the task locally with no executor using ``_run_task`` diff --git a/airflow/models/dagpickle.py b/airflow/models/dagpickle.py index e6f4561d8e1..c06ef09709f 100644 --- a/airflow/models/dagpickle.py +++ b/airflow/models/dagpickle.py @@ -32,7 +32,7 @@ class DagPickle(Base): """ - Represents a version of a DAG and becomes a source of truth for a BackfillJob execution. + Represents a version of a DAG and becomes a source of truth for an execution. Dags can originate from different places (user repos, main repo, ...) and also get executed in different places (different executors). A pickle is a native python serialized object, diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 0373bc667bf..cad82e72b8b 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -1111,7 +1111,7 @@ def notify_dagrun_state_changed(self, msg: str = ""): elif self.state == DagRunState.FAILED: get_listener_manager().hook.on_dag_run_failed(dag_run=self, msg=msg) # deliberately not notifying on QUEUED - # we can't get all the state changes on SchedulerJob, BackfillJob + # we can't get all the state changes on SchedulerJob, # or LocalTaskJob, so we don't want to "falsely advertise" we notify about that def _get_ready_tis( diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index e75ad83923b..5b51bb0d24d 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -2640,7 +2640,7 @@ def _check_and_change_state_before_execution( :param mark_success: Don't run the task, mark its state as success :param test_mode: Doesn't record success or failure in the DB :param hostname: The hostname of the worker running the task instance. - :param job_id: Job (BackfillJob / LocalTaskJob / SchedulerJob) ID + :param job_id: Job (LocalTaskJob / SchedulerJob) ID :param pool: specifies the pool to use to run the task instance :param external_executor_id: The identifier of the celery executor :param session: SQLAlchemy ORM Session diff --git a/airflow/ti_deps/dependencies_deps.py b/airflow/ti_deps/dependencies_deps.py index 44d6bfc5c7d..c167cdb3463 100644 --- a/airflow/ti_deps/dependencies_deps.py +++ b/airflow/ti_deps/dependencies_deps.py @@ -17,7 +17,6 @@ from __future__ import annotations from airflow.ti_deps.dependencies_states import ( - BACKFILL_QUEUEABLE_STATES, QUEUEABLE_STATES, RUNNABLE_STATES, ) @@ -48,13 +47,6 @@ TaskNotRunningDep(), } -BACKFILL_QUEUED_DEPS = { - RunnableExecDateDep(), - ValidStateDep(BACKFILL_QUEUEABLE_STATES), - DagrunRunningDep(), - TaskNotRunningDep(), -} - # TODO(aoen): SCHEDULER_QUEUED_DEPS is not coupled to actual scheduling/execution # in any way and could easily be modified or removed from the scheduler causing # this dependency to become outdated and incorrect. This coupling should be created diff --git a/airflow/ti_deps/dependencies_states.py b/airflow/ti_deps/dependencies_states.py index fd25d62f6d7..ebf581ab48e 100644 --- a/airflow/ti_deps/dependencies_states.py +++ b/airflow/ti_deps/dependencies_states.py @@ -42,12 +42,3 @@ QUEUEABLE_STATES = { TaskInstanceState.SCHEDULED, } - -BACKFILL_QUEUEABLE_STATES = { - # For cases like unit tests and run manually - None, - TaskInstanceState.UP_FOR_RESCHEDULE, - TaskInstanceState.UP_FOR_RETRY, - # For normal backfill cases - TaskInstanceState.SCHEDULED, -} diff --git a/docs/apache-airflow/administration-and-deployment/listeners.rst b/docs/apache-airflow/administration-and-deployment/listeners.rst index 1fca915a6f1..8ca3ed93fc0 100644 --- a/docs/apache-airflow/administration-and-deployment/listeners.rst +++ b/docs/apache-airflow/administration-and-deployment/listeners.rst @@ -34,7 +34,7 @@ Lifecycle Events - ``on_starting`` - ``before_stopping`` -Lifecycle events allow you to react to start and stop events for an Airflow ``Job``, like ``SchedulerJob`` or ``BackfillJob``. +Lifecycle events allow you to react to start and stop events for an Airflow ``Job``, like ``SchedulerJob``. DagRun State Change Events -------------------------- diff --git a/docs/apache-airflow/howto/listener-plugin.rst b/docs/apache-airflow/howto/listener-plugin.rst index 7b46a9de8a9..7d5fb430fb4 100644 --- a/docs/apache-airflow/howto/listener-plugin.rst +++ b/docs/apache-airflow/howto/listener-plugin.rst @@ -44,8 +44,8 @@ Using this plugin, following events can be listened: * dag run is in running state. * dag run is in success state. * dag run is in failure state. - * on start before event like airflow job, scheduler or backfilljob - * before stop for event like airflow job, scheduler or backfilljob + * on start before event like airflow job, scheduler + * before stop for event like airflow job, scheduler Listener Registration --------------------- diff --git a/tests/core/test_impersonation_tests.py b/tests/core/test_impersonation_tests.py index cf9359c8633..8350e95a8f4 100644 --- a/tests/core/test_impersonation_tests.py +++ b/tests/core/test_impersonation_tests.py @@ -28,13 +28,10 @@ import pytest from airflow.configuration import conf -from airflow.jobs.backfill_job_runner import BackfillJobRunner -from airflow.jobs.job import Job, run_job -from airflow.models import DagBag, DagRun, TaskInstance +from airflow.models import DagBag, TaskInstance from airflow.utils.db import add_default_pool_if_not_exists from airflow.utils.state import State from airflow.utils.timezone import datetime -from airflow.utils.types import DagRunType from dev.tests_common.test_utils import db @@ -175,17 +172,14 @@ def get_dagbag(dag_folder): logger.info(dagbag.dagbag_report()) return dagbag - def run_backfill(self, dag_id, task_id): + def run_dag(self, dag_id, task_id): dag = self.dagbag.get_dag(dag_id) dag.clear() - job = Job() - job_runner = BackfillJobRunner(job=job, dag=dag, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) - run_job(job=job, execute_callable=job_runner._execute) - run_id = DagRun.generate_run_id(DagRunType.BACKFILL_JOB, execution_date=DEFAULT_DATE) - ti = TaskInstance(task=dag.get_task(task_id), run_id=run_id) - ti.refresh_from_db() + dr = dag.test(use_executor=True) + ti = TaskInstance(task=dag.get_task(task_id), run_id=dr.run_id) + ti.refresh_from_db() assert ti.state == State.SUCCESS @@ -198,14 +192,14 @@ def test_impersonation(self): """ Tests that impersonating a unix user works """ - self.run_backfill("test_impersonation", "test_impersonated_user") + self.run_dag("test_impersonation", "test_impersonated_user") def test_no_impersonation(self): """ If default_impersonation=None, tests that the job is run as the current user (which will be a sudoer) """ - self.run_backfill( + self.run_dag( "test_no_impersonation", "test_superuser", ) @@ -216,7 +210,7 @@ def test_default_impersonation(self, monkeypatch): to running as TEST_USER for a test without 'run_as_user' set. """ monkeypatch.setenv("AIRFLOW__CORE__DEFAULT_IMPERSONATION", TEST_USER) - self.run_backfill("test_default_impersonation", "test_deelevated_user") + self.run_dag("test_default_impersonation", "test_deelevated_user") class TestImpersonationWithCustomPythonPath(BaseImpersonationTest): @@ -233,4 +227,4 @@ def test_impersonation_custom(self, monkeypatch): """ monkeypatch.setenv("PYTHONPATH", TEST_UTILS_FOLDER) assert TEST_UTILS_FOLDER not in sys.path - self.run_backfill("impersonation_with_custom_pkg", "exec_python_fn") + self.run_dag("impersonation_with_custom_pkg", "exec_python_fn") diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py deleted file mode 100644 index 616f328ee41..00000000000 --- a/tests/jobs/test_backfill_job.py +++ /dev/null @@ -1,2094 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -import datetime -import json -import logging -import threading -from collections import defaultdict -from importlib import reload -from unittest import mock -from unittest.mock import Mock, patch - -import pytest - -from airflow import settings -from airflow.exceptions import ( - AirflowException, - BackfillUnfinished, - DagConcurrencyLimitReached, - NoAvailablePoolSlot, - TaskConcurrencyLimitReached, - UnknownExecutorException, -) -from airflow.executors.debug_executor import DebugExecutor -from airflow.executors.executor_loader import ExecutorLoader -from airflow.executors.sequential_executor import SequentialExecutor -from airflow.jobs.backfill_job_runner import BackfillJobRunner -from airflow.jobs.job import Job, run_job -from airflow.listeners.listener import get_listener_manager -from airflow.models import DagBag, Pool, TaskInstance as TI -from airflow.models.dagrun import DagRun -from airflow.models.serialized_dag import SerializedDagModel -from airflow.models.taskinstancekey import TaskInstanceKey -from airflow.models.taskmap import TaskMap -from airflow.operators.empty import EmptyOperator -from airflow.utils import timezone -from airflow.utils.session import create_session -from airflow.utils.state import DagRunState, State, TaskInstanceState -from airflow.utils.trigger_rule import TriggerRule -from airflow.utils.types import DagRunType -from tests.listeners import dag_listener -from tests.models import TEST_DAGS_FOLDER - -from dev.tests_common.test_utils.compat import AIRFLOW_V_3_0_PLUS -from dev.tests_common.test_utils.config import conf_vars -from dev.tests_common.test_utils.db import ( - clear_db_dags, - clear_db_pools, - clear_db_runs, - clear_db_xcom, - set_default_pool_slots, -) -from dev.tests_common.test_utils.mock_executor import MockExecutor - -if AIRFLOW_V_3_0_PLUS: - from airflow.utils.types import DagRunTriggeredByType - -pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode] - -logger = logging.getLogger(__name__) - -DEFAULT_DATE = timezone.datetime(2016, 1, 1) -DEFAULT_DAG_RUN_ID = "test1" - - -@pytest.fixture(scope="module") -def dag_bag(): - return DagBag(include_examples=True) - - -class SecondaryMockExecutor(MockExecutor): - """Copy of MockExecutor class with a new name for testing with hybrid executors (which currently - disallows using the same executor concurrently)""" - - -def _mock_executor(executor=None): - if not executor: - default_executor = MockExecutor() - else: - if isinstance(executor, type): - default_executor = executor() - else: - default_executor = executor - - default_executor.name = mock.MagicMock( - alias="default_exec", - module_path=f"{default_executor.__module__}.{default_executor.__class__.__qualname__}", - ) - with mock.patch("airflow.jobs.job.Job.executors", new_callable=mock.PropertyMock) as executors_mock: - with mock.patch("airflow.jobs.job.Job.executor", new_callable=mock.PropertyMock) as executor_mock: - with mock.patch("airflow.executors.executor_loader.ExecutorLoader.load_executor") as loader_mock: - executor_mock.return_value = default_executor - executors_mock.return_value = [default_executor] - # The executor is mocked, so cannot be loaded/imported. Mock load_executor and return the - # correct object for the given input executor name. - loader_mock.side_effect = lambda *x: { - ("default_exec",): default_executor, - ("default.exec.module.path",): default_executor, - (None,): default_executor, - }[x] - - # Reload the job runner so that it gets a fresh instances of the mocked executor loader - from airflow.jobs import backfill_job_runner - - reload(backfill_job_runner) - - yield default_executor - - -@pytest.mark.execution_timeout(120) -class TestBackfillJob: - @pytest.fixture - def mock_executor(self): - yield from _mock_executor() - - def _mock_executors(self): - default_executor = MockExecutor() - _default_executor = Mock(wraps=default_executor) - default_alias = "default_exec" - default_module_path = f"{default_executor.__module__}.{default_executor.__class__.__qualname__}" - _default_executor.name = mock.MagicMock(alias=default_alias, module_path=default_module_path) - - secondary_executor = SecondaryMockExecutor() - _secondary_executor = Mock(wraps=secondary_executor) - secondary_alias = "secondary_exec" - secondary_module_path = f"{secondary_executor.__module__}.{secondary_executor.__class__.__qualname__}" - _secondary_executor.name = mock.MagicMock(alias=secondary_alias, module_path=secondary_module_path) - - with mock.patch( - "airflow.jobs.job.Job.executors", new_callable=mock.PropertyMock - ) as executors_mock, mock.patch( - "airflow.jobs.job.Job.executor", new_callable=mock.PropertyMock - ) as executor_mock, mock.patch( - "airflow.executors.executor_loader.ExecutorLoader.load_executor" - ) as loader_mock, conf_vars( - { - ( - "core", - "executor", - ): f"{default_alias}:{default_module_path},{secondary_alias}:{secondary_module_path}" - } - ): - # The executor is mocked, so cannot be loaded/imported. Mock load_executor and return the - # correct object for the given input executor name. - loader_mock.side_effect = lambda *x: { - (_secondary_executor.name.alias,): _secondary_executor, - (_secondary_executor.name.module_path,): _secondary_executor, - (default_alias,): _default_executor, - (default_module_path,): _default_executor, - (None,): _default_executor, - }[x] - - executor_mock.return_value = _default_executor - executors_mock.return_value = [_default_executor, _secondary_executor] - - yield (_default_executor, _secondary_executor) - - @pytest.fixture - def mock_executors(self): - yield from self._mock_executors() - - @staticmethod - def clean_db(): - clear_db_dags() - clear_db_runs() - clear_db_xcom() - clear_db_pools() - - @pytest.fixture(autouse=True) - def set_instance_attrs(self, dag_bag): - self.clean_db() - self.dagbag = dag_bag - # `airflow tasks run` relies on serialized_dag - for dag in self.dagbag.dags.values(): - SerializedDagModel.write_dag(dag) - - def _get_dummy_dag( - self, - dag_maker_fixture, - dag_id="test_dag", - pool=Pool.DEFAULT_POOL_NAME, - max_active_tis_per_dag=None, - task_id="op", - **kwargs, - ): - with dag_maker_fixture(dag_id=dag_id, schedule="@daily", **kwargs) as dag: - EmptyOperator(task_id=task_id, pool=pool, max_active_tis_per_dag=max_active_tis_per_dag) - - return dag - - def _times_called_with(self, method, class_): - count = 0 - for args in method.call_args_list: - if isinstance(args[0][0], class_): - count += 1 - return count - - def test_unfinished_dag_runs_set_to_failed(self, dag_maker): - dag = self._get_dummy_dag(dag_maker) - dag_run = dag_maker.create_dagrun(state=None) - - job = Job() - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE + datetime.timedelta(days=8), - ignore_first_depends_on_past=True, - ) - - job_runner._set_unfinished_dag_runs_to_failed([dag_run]) - dag_run.refresh_from_db() - - assert State.FAILED == dag_run.state - - def test_dag_run_with_finished_tasks_set_to_success(self, dag_maker, mock_executor): - dag = self._get_dummy_dag(dag_maker) - dag_run = dag_maker.create_dagrun(state=None) - - for ti in dag_run.get_task_instances(): - ti.set_state(State.SUCCESS) - - job = Job() - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE + datetime.timedelta(days=8), - ignore_first_depends_on_past=True, - ) - job_runner._set_unfinished_dag_runs_to_failed([dag_run]) - - dag_run.refresh_from_db() - - assert State.SUCCESS == dag_run.state - - @pytest.mark.backend("postgres", "mysql") - def test_trigger_controller_dag(self, session): - dag = self.dagbag.get_dag("example_trigger_controller_dag") - target_dag = self.dagbag.get_dag("example_trigger_target_dag") - target_dag.sync_to_db() - - target_dag_run = session.query(DagRun).filter(DagRun.dag_id == target_dag.dag_id).one_or_none() - assert target_dag_run is None - - job = Job() - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE, - ignore_first_depends_on_past=True, - ) - - run_job(job=job, execute_callable=job_runner._execute) - - dag_run = session.query(DagRun).filter(DagRun.dag_id == dag.dag_id).one_or_none() - assert dag_run is not None - - task_instances_list = job_runner._task_instances_for_dag_run(dag=dag, dag_run=dag_run) - - assert task_instances_list - - @pytest.mark.backend("postgres", "mysql") - def test_backfill_multi_dates(self, mock_executor): - dag = self.dagbag.get_dag("miscellaneous_test_dag") - - end_date = DEFAULT_DATE + datetime.timedelta(days=1) - - job = Job() - executor = job.executor - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=DEFAULT_DATE, - end_date=end_date, - ignore_first_depends_on_past=True, - ) - - run_job(job=job, execute_callable=job_runner._execute) - expected_execution_order = [ - ("runme_0", DEFAULT_DATE), - ("runme_1", DEFAULT_DATE), - ("runme_2", DEFAULT_DATE), - ("runme_0", end_date), - ("runme_1", end_date), - ("runme_2", end_date), - ("also_run_this", DEFAULT_DATE), - ("also_run_this", end_date), - ("run_after_loop", DEFAULT_DATE), - ("run_after_loop", end_date), - ("run_this_last", DEFAULT_DATE), - ("run_this_last", end_date), - ] - actual = [(tuple(x), y) for x, y in executor.sorted_tasks] - expected = [ - ( - (dag.dag_id, task_id, f"backfill__{when.isoformat()}", 1, -1), - (State.SUCCESS, None), - ) - for (task_id, when) in expected_execution_order - ] - assert actual == expected - session = settings.Session() - drs = session.query(DagRun).filter(DagRun.dag_id == dag.dag_id).order_by(DagRun.execution_date).all() - - assert drs[0].execution_date == DEFAULT_DATE - assert drs[0].state == State.SUCCESS - assert drs[1].execution_date == DEFAULT_DATE + datetime.timedelta(days=1) - assert drs[1].state == State.SUCCESS - - dag.clear() - session.close() - - @pytest.mark.backend("postgres", "mysql") - @pytest.mark.parametrize( - "dag_id, expected_execution_order", - [ - [ - "example_branch_operator", - ( - "run_this_first", - "branching", - "branch_a", - "branch_b", - "branch_c", - "branch_d", - "follow_a", - "follow_b", - "follow_c", - "follow_d", - "join", - "branching_ext_python", - "ext_py_a", - "ext_py_b", - "ext_py_c", - "ext_py_d", - "join_ext_python", - "branching_venv", - "venv_a", - "venv_b", - "venv_c", - "venv_d", - "join_venv", - ), - ], - [ - "miscellaneous_test_dag", - ("runme_0", "runme_1", "runme_2", "also_run_this", "run_after_loop", "run_this_last"), - ], - [ - "example_skip_dag", - ( - "always_true_1", - "always_true_2", - "skip_operator_1", - "skip_operator_2", - "all_success", - "one_success", - "final_1", - "final_2", - ), - ], - ["latest_only", ("latest_only", "task1")], - ], - ) - def test_backfill_examples(self, dag_id, expected_execution_order, mock_executor): - """ - Test backfilling example dags - - Try to backfill some of the example dags. Be careful, not all dags are suitable - for doing this. For example, a dag that sleeps forever, or does not have a - schedule won't work here since you simply can't backfill them. - """ - dag = self.dagbag.get_dag(dag_id) - - logger.info("*** Running example DAG: %s", dag.dag_id) - job = Job() - executor = job.executor - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE, - ignore_first_depends_on_past=True, - ) - - run_job(job=job, execute_callable=job_runner._execute) - assert [ - ((dag_id, task_id, f"backfill__{DEFAULT_DATE.isoformat()}", 1, -1), (State.SUCCESS, None)) - for task_id in expected_execution_order - ] == executor.sorted_tasks - - def test_backfill_conf(self, dag_maker, mock_executor): - dag = self._get_dummy_dag(dag_maker, dag_id="test_backfill_conf") - dag_maker.create_dagrun(state=None) - - conf_ = json.loads("""{"key": "value"}""") - job = Job() - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE + datetime.timedelta(days=2), - conf=conf_, - ) - run_job(job=job, execute_callable=job_runner._execute) - - # We ignore the first dag_run created by fixture - dr = DagRun.find( - dag_id="test_backfill_conf", execution_start_date=DEFAULT_DATE + datetime.timedelta(days=1) - ) - - assert conf_ == dr[0].conf - - def test_backfill_respect_max_active_tis_per_dag_limit(self, dag_maker, mock_executor): - max_active_tis_per_dag = 2 - dag = self._get_dummy_dag( - dag_maker, - dag_id="test_backfill_respect_max_active_tis_per_dag_limit", - max_active_tis_per_dag=max_active_tis_per_dag, - ) - dag_maker.create_dagrun(state=None) - - job = Job() - executor = job.executor - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE + datetime.timedelta(days=7), - ) - - mock_log = Mock() - job_runner._log = mock_log - - run_job(job=job, execute_callable=job_runner._execute) - - assert len(executor.history) > 0 - - task_concurrency_limit_reached_at_least_once = False - - num_running_task_instances = 0 - for running_task_instances in executor.history: - assert len(running_task_instances) <= max_active_tis_per_dag - num_running_task_instances += len(running_task_instances) - if len(running_task_instances) == max_active_tis_per_dag: - task_concurrency_limit_reached_at_least_once = True - - assert 8 == num_running_task_instances - assert task_concurrency_limit_reached_at_least_once - - times_dag_concurrency_limit_reached_in_debug = self._times_called_with( - mock_log.debug, - DagConcurrencyLimitReached, - ) - - times_pool_limit_reached_in_debug = self._times_called_with( - mock_log.debug, - NoAvailablePoolSlot, - ) - - times_task_concurrency_limit_reached_in_debug = self._times_called_with( - mock_log.debug, - TaskConcurrencyLimitReached, - ) - - assert 0 == times_pool_limit_reached_in_debug - assert 0 == times_dag_concurrency_limit_reached_in_debug - assert times_task_concurrency_limit_reached_in_debug > 0 - - @pytest.mark.parametrize("with_max_active_tis_per_dag", [False, True]) - def test_backfill_respect_max_active_tis_per_dagrun_limit( - self, dag_maker, with_max_active_tis_per_dag, mock_executor - ): - max_active_tis_per_dag = 3 - max_active_tis_per_dagrun = 2 - kwargs = {"max_active_tis_per_dagrun": max_active_tis_per_dagrun} - if with_max_active_tis_per_dag: - kwargs["max_active_tis_per_dag"] = max_active_tis_per_dag - - with dag_maker(dag_id="test_backfill_respect_max_active_tis_per_dag_limit", schedule="@daily") as dag: - EmptyOperator.partial(task_id="task1", **kwargs).expand_kwargs([{"x": i} for i in range(10)]) - - dag_maker.create_dagrun(state=None) - - job = Job() - executor = job.executor - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE + datetime.timedelta(days=7), - ) - - mock_log = Mock() - job_runner._log = mock_log - - run_job(job=job, execute_callable=job_runner._execute) - - assert len(executor.history) > 0 - - task_concurrency_limit_reached_at_least_once = False - - def get_running_tis_per_dagrun(running_tis): - running_tis_per_dagrun_dict = defaultdict(int) - for running_ti in running_tis: - running_tis_per_dagrun_dict[running_ti[3].dag_run.id] += 1 - return running_tis_per_dagrun_dict - - num_running_task_instances = 0 - for running_task_instances in executor.history: - if with_max_active_tis_per_dag: - assert len(running_task_instances) <= max_active_tis_per_dag - running_tis_per_dagrun_dict = get_running_tis_per_dagrun(running_task_instances) - assert all( - [ - num_running_tis <= max_active_tis_per_dagrun - for num_running_tis in running_tis_per_dagrun_dict.values() - ] - ) - num_running_task_instances += len(running_task_instances) - task_concurrency_limit_reached_at_least_once = ( - task_concurrency_limit_reached_at_least_once - or any( - [ - num_running_tis == max_active_tis_per_dagrun - for num_running_tis in running_tis_per_dagrun_dict.values() - ] - ) - ) - - assert 80 == num_running_task_instances # (7 backfill run + 1 manual run ) * 10 mapped task per run - assert task_concurrency_limit_reached_at_least_once - - times_dag_concurrency_limit_reached_in_debug = self._times_called_with( - mock_log.debug, - DagConcurrencyLimitReached, - ) - - times_pool_limit_reached_in_debug = self._times_called_with( - mock_log.debug, - NoAvailablePoolSlot, - ) - - times_task_concurrency_limit_reached_in_debug = self._times_called_with( - mock_log.debug, - TaskConcurrencyLimitReached, - ) - - assert 0 == times_pool_limit_reached_in_debug - assert 0 == times_dag_concurrency_limit_reached_in_debug - assert times_task_concurrency_limit_reached_in_debug > 0 - - def test_backfill_respect_dag_concurrency_limit(self, dag_maker, mock_executor): - dag = self._get_dummy_dag(dag_maker, dag_id="test_backfill_respect_concurrency_limit") - dag_maker.create_dagrun(state=None) - dag.max_active_tasks = 2 - - job = Job() - executor = job.executor - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE + datetime.timedelta(days=7), - ) - - mock_log = Mock() - job_runner._log = mock_log - - run_job(job=job, execute_callable=job_runner._execute) - - assert len(executor.history) > 0 - - concurrency_limit_reached_at_least_once = False - - num_running_task_instances = 0 - - for running_task_instances in executor.history: - assert len(running_task_instances) <= dag.max_active_tasks - num_running_task_instances += len(running_task_instances) - if len(running_task_instances) == dag.max_active_tasks: - concurrency_limit_reached_at_least_once = True - - assert 8 == num_running_task_instances - assert concurrency_limit_reached_at_least_once - - times_dag_concurrency_limit_reached_in_debug = self._times_called_with( - mock_log.debug, - DagConcurrencyLimitReached, - ) - - times_pool_limit_reached_in_debug = self._times_called_with( - mock_log.debug, - NoAvailablePoolSlot, - ) - - times_task_concurrency_limit_reached_in_debug = self._times_called_with( - mock_log.debug, - TaskConcurrencyLimitReached, - ) - - assert 0 == times_pool_limit_reached_in_debug - assert 0 == times_task_concurrency_limit_reached_in_debug - assert times_dag_concurrency_limit_reached_in_debug > 0 - - def test_backfill_respect_default_pool_limit(self, dag_maker, mock_executor): - default_pool_slots = 2 - set_default_pool_slots(default_pool_slots) - - dag = self._get_dummy_dag(dag_maker, dag_id="test_backfill_with_no_pool_limit") - dag_maker.create_dagrun(state=None) - - job = Job() - executor = job.executor - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE + datetime.timedelta(days=7), - ) - - mock_log = Mock() - job_runner._log = mock_log - - run_job(job=job, execute_callable=job_runner._execute) - - assert len(executor.history) > 0 - - default_pool_task_slot_count_reached_at_least_once = False - - num_running_task_instances = 0 - - # if no pool is specified, the number of tasks running in - # parallel per backfill should be less than - # default_pool slots at any point of time. - for running_task_instances in executor.history: - assert len(running_task_instances) <= default_pool_slots - num_running_task_instances += len(running_task_instances) - if len(running_task_instances) == default_pool_slots: - default_pool_task_slot_count_reached_at_least_once = True - - assert 8 == num_running_task_instances - assert default_pool_task_slot_count_reached_at_least_once - - times_dag_concurrency_limit_reached_in_debug = self._times_called_with( - mock_log.debug, - DagConcurrencyLimitReached, - ) - - times_pool_limit_reached_in_debug = self._times_called_with( - mock_log.debug, - NoAvailablePoolSlot, - ) - - times_task_concurrency_limit_reached_in_debug = self._times_called_with( - mock_log.debug, - TaskConcurrencyLimitReached, - ) - - assert 0 == times_dag_concurrency_limit_reached_in_debug - assert 0 == times_task_concurrency_limit_reached_in_debug - assert times_pool_limit_reached_in_debug > 0 - - def test_backfill_pool_not_found(self, dag_maker, mock_executor): - dag = self._get_dummy_dag( - dag_maker, - dag_id="test_backfill_pool_not_found", - pool="king_pool", - ) - dag_maker.create_dagrun(state=None) - - job = Job() - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE + datetime.timedelta(days=7), - ) - - try: - run_job(job=job, execute_callable=job_runner._execute) - except AirflowException: - return - - def test_backfill_respect_pool_limit(self, dag_maker, mock_executor): - session = settings.Session() - - slots = 2 - pool = Pool( - pool="pool_with_two_slots", - slots=slots, - include_deferred=False, - ) - session.add(pool) - session.commit() - - dag = self._get_dummy_dag( - dag_maker, - dag_id="test_backfill_respect_pool_limit", - pool=pool.pool, - ) - dag_maker.create_dagrun(state=None) - - job = Job() - executor = job.executor - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE + datetime.timedelta(days=7), - ) - - mock_log = Mock() - job_runner._log = mock_log - - run_job(job=job, execute_callable=job_runner._execute) - - assert len(executor.history) > 0 - - pool_was_full_at_least_once = False - num_running_task_instances = 0 - - for running_task_instances in executor.history: - assert len(running_task_instances) <= slots - num_running_task_instances += len(running_task_instances) - if len(running_task_instances) == slots: - pool_was_full_at_least_once = True - - assert 8 == num_running_task_instances - assert pool_was_full_at_least_once - - times_dag_concurrency_limit_reached_in_debug = self._times_called_with( - mock_log.debug, - DagConcurrencyLimitReached, - ) - - times_pool_limit_reached_in_debug = self._times_called_with( - mock_log.debug, - NoAvailablePoolSlot, - ) - - times_task_concurrency_limit_reached_in_debug = self._times_called_with( - mock_log.debug, - TaskConcurrencyLimitReached, - ) - - assert 0 == times_task_concurrency_limit_reached_in_debug - assert 0 == times_dag_concurrency_limit_reached_in_debug - assert times_pool_limit_reached_in_debug > 0 - - def test_backfill_run_rescheduled(self, dag_maker, mock_executor): - dag = self._get_dummy_dag( - dag_maker, dag_id="test_backfill_run_rescheduled", task_id="test_backfill_run_rescheduled_task-1" - ) - dag_maker.create_dagrun(state=None, run_id=DEFAULT_DAG_RUN_ID) - - job = Job() - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE + datetime.timedelta(days=2), - ) - run_job(job=job, execute_callable=job_runner._execute) - - ti = TI(task=dag.get_task("test_backfill_run_rescheduled_task-1"), run_id=DEFAULT_DAG_RUN_ID) - ti.refresh_from_db() - ti.set_state(State.UP_FOR_RESCHEDULE) - - for _ in _mock_executor(): - job = Job() - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE + datetime.timedelta(days=2), - rerun_failed_tasks=True, - ) - run_job(job=job, execute_callable=job_runner._execute) - ti = TI(task=dag.get_task("test_backfill_run_rescheduled_task-1"), run_id=DEFAULT_DAG_RUN_ID) - ti.refresh_from_db() - assert ti.state == State.SUCCESS - - def test_backfill_override_conf(self, dag_maker, mock_executor): - dag = self._get_dummy_dag( - dag_maker, dag_id="test_backfill_override_conf", task_id="test_backfill_override_conf-1" - ) - dr = dag_maker.create_dagrun( - state=None, - start_date=DEFAULT_DATE, - ) - - job = Job() - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE + datetime.timedelta(days=2), - conf={"a": 1}, - ) - - with patch.object( - job_runner, - "_task_instances_for_dag_run", - wraps=job_runner._task_instances_for_dag_run, - ) as wrapped_task_instances_for_dag_run: - run_job(job=job, execute_callable=job_runner._execute) - dr = wrapped_task_instances_for_dag_run.call_args_list[0][0][1] - assert dr.conf == {"a": 1} - - def test_backfill_skip_active_scheduled_dagrun(self, dag_maker, caplog, mock_executor): - dag = self._get_dummy_dag( - dag_maker, - dag_id="test_backfill_skip_active_scheduled_dagrun", - task_id="test_backfill_skip_active_scheduled_dagrun-1", - ) - dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, state=State.RUNNING, run_id=DEFAULT_DAG_RUN_ID) - - job = Job() - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE + datetime.timedelta(days=2), - ) - with caplog.at_level(logging.ERROR, logger="airflow.jobs.backfill_job_runner.BackfillJob"): - caplog.clear() - run_job(job=job, execute_callable=job_runner._execute) - assert "Backfill cannot be created for DagRun" in caplog.messages[0] - - ti = TI(task=dag.get_task("test_backfill_skip_active_scheduled_dagrun-1"), run_id=DEFAULT_DAG_RUN_ID) - ti.refresh_from_db() - # since DAG backfill is skipped, task state should be none - assert ti.state == State.NONE - - def test_backfill_rerun_failed_tasks(self, dag_maker, mock_executor): - dag = self._get_dummy_dag( - dag_maker, dag_id="test_backfill_rerun_failed", task_id="test_backfill_rerun_failed_task-1" - ) - dag_maker.create_dagrun(state=None, run_id=DEFAULT_DAG_RUN_ID) - - job = Job() - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE + datetime.timedelta(days=2), - ) - run_job(job=job, execute_callable=job_runner._execute) - - ti = TI(task=dag.get_task("test_backfill_rerun_failed_task-1"), run_id=DEFAULT_DAG_RUN_ID) - ti.refresh_from_db() - ti.set_state(State.FAILED) - - job = Job() - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE + datetime.timedelta(days=2), - rerun_failed_tasks=True, - ) - run_job(job=job, execute_callable=job_runner._execute) - ti = TI(task=dag.get_task("test_backfill_rerun_failed_task-1"), run_id=DEFAULT_DAG_RUN_ID) - ti.refresh_from_db() - assert ti.state == State.SUCCESS - - def test_backfill_rerun_upstream_failed_tasks(self, dag_maker, mock_executor): - with dag_maker(dag_id="test_backfill_rerun_upstream_failed", schedule="@daily") as dag: - op1 = EmptyOperator(task_id="test_backfill_rerun_upstream_failed_task-1") - op2 = EmptyOperator(task_id="test_backfill_rerun_upstream_failed_task-2") - op1.set_upstream(op2) - dag_maker.create_dagrun(state=None, run_id=DEFAULT_DAG_RUN_ID) - - job = Job() - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE + datetime.timedelta(days=2), - ) - run_job(job=job, execute_callable=job_runner._execute) - - ti = TI(task=dag.get_task("test_backfill_rerun_upstream_failed_task-1"), run_id=DEFAULT_DAG_RUN_ID) - ti.refresh_from_db() - ti.set_state(State.UPSTREAM_FAILED) - - job = Job() - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE + datetime.timedelta(days=2), - rerun_failed_tasks=True, - ) - run_job(job=job, execute_callable=job_runner._execute) - ti = TI(task=dag.get_task("test_backfill_rerun_upstream_failed_task-1"), run_id=DEFAULT_DAG_RUN_ID) - ti.refresh_from_db() - assert ti.state == State.SUCCESS - - def test_backfill_rerun_failed_tasks_without_flag(self, dag_maker, mock_executor): - dag = self._get_dummy_dag( - dag_maker, dag_id="test_backfill_rerun_failed", task_id="test_backfill_rerun_failed_task-1" - ) - dag_maker.create_dagrun(state=None, run_id=DEFAULT_DAG_RUN_ID) - - job = Job() - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE + datetime.timedelta(days=2), - ) - run_job(job=job, execute_callable=job_runner._execute) - - ti = TI(task=dag.get_task("test_backfill_rerun_failed_task-1"), run_id=DEFAULT_DAG_RUN_ID) - ti.refresh_from_db() - ti.set_state(State.FAILED) - - job = Job() - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE + datetime.timedelta(days=2), - rerun_failed_tasks=False, - ) - - with pytest.raises(AirflowException): - run_job(job=job, execute_callable=job_runner._execute) - - def test_backfill_retry_intermittent_failed_task(self, dag_maker, mock_executor): - with dag_maker( - dag_id="test_intermittent_failure_job", - schedule="@daily", - default_args={ - "retries": 2, - "retry_delay": datetime.timedelta(seconds=0), - }, - ) as dag: - task1 = EmptyOperator(task_id="task1") - dag_maker.create_dagrun(state=None) - - executor = mock_executor - executor.mock_task_results[TaskInstanceKey(dag.dag_id, task1.task_id, DEFAULT_DATE, try_number=1)] = ( - State.UP_FOR_RETRY - ) - executor.mock_task_results[TaskInstanceKey(dag.dag_id, task1.task_id, DEFAULT_DATE, try_number=2)] = ( - State.UP_FOR_RETRY - ) - job = Job() - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE + datetime.timedelta(days=2), - ) - run_job(job=job, execute_callable=job_runner._execute) - - def test_backfill_retry_always_failed_task(self, dag_maker, mock_executor): - with dag_maker( - dag_id="test_always_failure_job", - schedule="@daily", - default_args={ - "retries": 1, - "retry_delay": datetime.timedelta(seconds=0), - }, - ) as dag: - task1 = EmptyOperator(task_id="task1") - dr = dag_maker.create_dagrun(state=None) - - executor = mock_executor - executor.mock_task_results[TaskInstanceKey(dag.dag_id, task1.task_id, dr.run_id, try_number=0)] = ( - State.UP_FOR_RETRY - ) - executor.mock_task_fail(dag.dag_id, task1.task_id, dr.run_id, try_number=1) - job = Job() - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE, - ) - with pytest.raises(BackfillUnfinished): - run_job(job=job, execute_callable=job_runner._execute) - - def test_backfill_ordered_concurrent_execute(self, dag_maker, mock_executor): - with dag_maker( - dag_id="test_backfill_ordered_concurrent_execute", - schedule="@daily", - ) as dag: - op1 = EmptyOperator(task_id="leave1") - op2 = EmptyOperator(task_id="leave2") - op3 = EmptyOperator(task_id="upstream_level_1") - op4 = EmptyOperator(task_id="upstream_level_2") - op5 = EmptyOperator(task_id="upstream_level_3") - # order randomly - op2.set_downstream(op3) - op1.set_downstream(op3) - op4.set_downstream(op5) - op3.set_downstream(op4) - runid0 = f"backfill__{DEFAULT_DATE.isoformat()}" - dag_maker.create_dagrun(run_id=runid0) - - executor = mock_executor - job = Job() - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE + datetime.timedelta(days=2), - ) - run_job(job=job, execute_callable=job_runner._execute) - - runid1 = f"backfill__{(DEFAULT_DATE + datetime.timedelta(days=1)).isoformat()}" - runid2 = f"backfill__{(DEFAULT_DATE + datetime.timedelta(days=2)).isoformat()}" - - actual = [] - for batch in executor.history: - this_batch = [] - for cmd, idx, queue, ti in batch: # noqa: B007 - key = ti.key - this_batch.append((key.task_id, key.run_id)) - actual.append(sorted(this_batch)) - assert actual == [ - [ - ("leave1", runid0), - ("leave1", runid1), - ("leave1", runid2), - ("leave2", runid0), - ("leave2", runid1), - ("leave2", runid2), - ], - [ - ("upstream_level_1", runid0), - ("upstream_level_1", runid1), - ("upstream_level_1", runid2), - ], - [ - ("upstream_level_2", runid0), - ("upstream_level_2", runid1), - ("upstream_level_2", runid2), - ], - [ - ("upstream_level_3", runid0), - ("upstream_level_3", runid1), - ("upstream_level_3", runid2), - ], - ] - - @pytest.mark.parametrize("ignore_depends_on_past", [True, False]) - def test_backfill_depends_on_past_works_independently_on_ignore_depends_on_past( - self, ignore_depends_on_past, mock_executor - ): - dag = self.dagbag.get_dag("test_depends_on_past") - dag.clear() - run_date = DEFAULT_DATE + datetime.timedelta(days=5) - - job = Job() - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=run_date, - end_date=run_date, - ignore_first_depends_on_past=ignore_depends_on_past, - ) - run_job(job=job, execute_callable=job_runner._execute) - - run_id = f"backfill__{run_date.isoformat()}" - # ti should have succeeded - ti = TI(dag.tasks[0], run_id=run_id) - ti.refresh_from_db() - assert ti.state == State.SUCCESS - - def test_backfill_depends_on_past_backwards(self, mock_executor): - """ - Test that CLI respects -B argument and raises on interaction with depends_on_past - """ - dag_id = "test_depends_on_past" - start_date = DEFAULT_DATE + datetime.timedelta(days=1) - end_date = start_date + datetime.timedelta(days=1) - kwargs = dict( - start_date=start_date, - end_date=end_date, - ) - dag = self.dagbag.get_dag(dag_id) - dag.clear() - - job = Job() - job_runner = BackfillJobRunner(job=job, dag=dag, ignore_first_depends_on_past=True, **kwargs) - run_job(job=job, execute_callable=job_runner._execute) - - run_id = f"backfill__{end_date.isoformat()}" - ti = TI(dag.get_task("test_dop_task"), run_id=run_id) - ti.refresh_from_db() - # runs fine forwards - assert ti.state == State.SUCCESS - - # raises backwards - expected_msg = "You cannot backfill backwards because one or more tasks depend_on_past: test_dop_task" - - for _ in _mock_executor(): - # Mock again to get a new executor - job = Job() - job_runner = BackfillJobRunner(job=job, dag=dag, run_backwards=True, **kwargs) - with pytest.raises(AirflowException, match=expected_msg): - run_job(job=job, execute_callable=job_runner._execute) - - def _get_dag_test_max_active_limits( - self, dag_maker_fixture, dag_id="test_dag", max_active_runs=1, **kwargs - ): - with dag_maker_fixture( - dag_id=dag_id, - schedule="@hourly", - max_active_runs=max_active_runs, - **kwargs, - ) as dag: - op1 = EmptyOperator(task_id="leave1") - op2 = EmptyOperator(task_id="leave2") - op3 = EmptyOperator(task_id="upstream_level_1") - op4 = EmptyOperator(task_id="upstream_level_2") - - op1 >> op2 >> op3 - op4 >> op3 - return dag - - def test_backfill_max_limit_check_within_limit(self, dag_maker, mock_executor): - dag = self._get_dag_test_max_active_limits( - dag_maker, dag_id="test_backfill_max_limit_check_within_limit", max_active_runs=16 - ) - dag_maker.create_dagrun(state=None) - start_date = DEFAULT_DATE - datetime.timedelta(hours=1) - end_date = DEFAULT_DATE - - job = Job() - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=start_date, - end_date=end_date, - donot_pickle=True, - ) - run_job(job=job, execute_callable=job_runner._execute) - - dagruns = DagRun.find(dag_id=dag.dag_id) - assert 2 == len(dagruns) - assert all(run.state == State.SUCCESS for run in dagruns) - - def test_backfill_notifies_dagrun_listener(self, dag_maker, mock_executor): - dag = self._get_dummy_dag(dag_maker) - dag_run = dag_maker.create_dagrun(state=None) - dag_listener.clear() - get_listener_manager().add_listener(dag_listener) - - start_date = DEFAULT_DATE - datetime.timedelta(hours=1) - end_date = DEFAULT_DATE - - job = Job() - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=start_date, - end_date=end_date, - donot_pickle=True, - ) - job.notification_threadpool = mock.MagicMock() - run_job(job=job, execute_callable=job_runner._execute) - - assert len(dag_listener.running) == 1 - assert len(dag_listener.success) == 1 - assert dag_listener.running[0].dag.dag_id == dag_run.dag.dag_id - assert dag_listener.running[0].run_id == dag_run.run_id - assert dag_listener.running[0].state == DagRunState.RUNNING - - assert dag_listener.success[0].dag.dag_id == dag_run.dag.dag_id - assert dag_listener.success[0].run_id == dag_run.run_id - assert dag_listener.success[0].state == DagRunState.SUCCESS - - def test_backfill_max_limit_check(self, dag_maker, mock_executor): - dag_id = "test_backfill_max_limit_check" - run_id = "test_dag_run" - start_date = DEFAULT_DATE - datetime.timedelta(hours=1) - end_date = DEFAULT_DATE - - dag_run_created_cond = threading.Condition() - - def run_backfill(cond): - cond.acquire() - # this session object is different than the one in the main thread - with create_session() as thread_session: - try: - dag = self._get_dag_test_max_active_limits( - dag_maker, - dag_id=dag_id, - ) - dag_maker.create_dagrun( - state=State.RUNNING, - # Existing dagrun that is not within the backfill range - run_id=run_id, - execution_date=DEFAULT_DATE + datetime.timedelta(hours=1), - ) - thread_session.commit() - cond.notify() - except Exception: - logger.exception("Exception when creating DagRun") - finally: - cond.release() - thread_session.close() - - job = Job() - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=start_date, - end_date=end_date, - donot_pickle=True, - ) - run_job(job=job, execute_callable=job_runner._execute) - - backfill_job_thread = threading.Thread( - target=run_backfill, name="run_backfill", args=(dag_run_created_cond,) - ) - - dag_run_created_cond.acquire() - with create_session() as session: - backfill_job_thread.start() - try: - # at this point backfill can't run since the max_active_runs has been - # reached, so it is waiting - dag_run_created_cond.wait(timeout=1.5) - dagruns = DagRun.find(dag_id=dag_id) - logger.info("The dag runs retrieved: %s", dagruns) - assert 1 == len(dagruns) - dr = dagruns[0] - assert dr.run_id == run_id - - # allow the backfill to execute - # by setting the existing dag run to SUCCESS, - # backfill will execute dag runs 1 by 1 - dr.set_state(State.SUCCESS) - session.merge(dr) - session.commit() - - backfill_job_thread.join() - - dagruns = DagRun.find(dag_id=dag_id) - assert 3 == len(dagruns) # 2 from backfill + 1 existing - assert dagruns[-1].run_id == dr.run_id - finally: - dag_run_created_cond.release() - - def test_backfill_max_limit_check_no_count_existing(self, dag_maker, mock_executor): - start_date = DEFAULT_DATE - end_date = DEFAULT_DATE - # Existing dagrun that is within the backfill range - dag = self._get_dag_test_max_active_limits( - dag_maker, dag_id="test_backfill_max_limit_check_no_count_existing" - ) - dag_maker.create_dagrun(state=None) - - job = Job() - job_runner = BackfillJobRunner( - job=job, dag=dag, start_date=start_date, end_date=end_date, donot_pickle=True - ) - run_job(job=job, execute_callable=job_runner._execute) - - # BackfillJobRunner will run since the existing DagRun does not count for the max - # active limit since it's within the backfill date range. - dagruns = DagRun.find(dag_id=dag.dag_id) - # will only be able to run 1 (the existing one) since there's just - # one dag run slot left given the max_active_runs limit - assert 1 == len(dagruns) - assert State.SUCCESS == dagruns[0].state - - def test_backfill_max_limit_check_complete_loop(self, dag_maker, mock_executor): - dag = self._get_dag_test_max_active_limits( - dag_maker, dag_id="test_backfill_max_limit_check_complete_loop" - ) - dag_maker.create_dagrun(state=None) - start_date = DEFAULT_DATE - datetime.timedelta(hours=1) - end_date = DEFAULT_DATE - - # Given the max limit to be 1 in active dag runs, we need to run the - # backfill job 3 times - success_expected = 2 - job = Job() - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=start_date, - end_date=end_date, - donot_pickle=True, - ) - run_job(job=job, execute_callable=job_runner._execute) - - success_dagruns = len(DagRun.find(dag_id=dag.dag_id, state=State.SUCCESS)) - running_dagruns = len(DagRun.find(dag_id=dag.dag_id, state=State.RUNNING)) - assert success_expected == success_dagruns - assert 0 == running_dagruns # no dag_runs in running state are left - - def test_sub_set_subdag(self, dag_maker, mock_executor): - with dag_maker( - "test_sub_set_subdag", - on_success_callback=lambda _: None, - on_failure_callback=lambda _: None, - ) as dag: - op1 = EmptyOperator(task_id="leave1") - op2 = EmptyOperator(task_id="leave2") - op3 = EmptyOperator(task_id="upstream_level_1") - op4 = EmptyOperator(task_id="upstream_level_2") - op5 = EmptyOperator(task_id="upstream_level_3") - # order randomly - op2.set_downstream(op3) - op1.set_downstream(op3) - op4.set_downstream(op5) - op3.set_downstream(op4) - - dr = dag_maker.create_dagrun(state=None) - - sub_dag = dag.partial_subset( - task_ids_or_regex="leave*", include_downstream=False, include_upstream=False - ) - job = Job() - job_runner = BackfillJobRunner(job=job, dag=sub_dag, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) - run_job(job=job, execute_callable=job_runner._execute) - - for ti in dr.get_task_instances(): - if ti.task_id == "leave1" or ti.task_id == "leave2": - assert State.SUCCESS == ti.state - else: - assert State.NONE == ti.state - - def test_backfill_fill_blanks(self, dag_maker, mock_executor): - with dag_maker( - "test_backfill_fill_blanks", - ) as dag: - op1 = EmptyOperator(task_id="op1") - op2 = EmptyOperator(task_id="op2") - op3 = EmptyOperator(task_id="op3") - op4 = EmptyOperator(task_id="op4") - op5 = EmptyOperator(task_id="op5") - op6 = EmptyOperator(task_id="op6") - - dr = dag_maker.create_dagrun(state=None) - - session = settings.Session() - - tis = dr.get_task_instances() - for ti in tis: - if ti.task_id == op1.task_id: - ti.state = State.UP_FOR_RETRY - ti.end_date = DEFAULT_DATE - elif ti.task_id == op2.task_id: - ti.state = State.FAILED - elif ti.task_id == op3.task_id: - ti.state = State.SKIPPED - elif ti.task_id == op4.task_id: - ti.state = State.SCHEDULED - elif ti.task_id == op5.task_id: - ti.state = State.UPSTREAM_FAILED - # op6 = None - session.merge(ti) - session.commit() - session.close() - - job = Job() - job_runner = BackfillJobRunner(job=job, dag=dag, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) - with pytest.raises(AirflowException, match="Some task instances failed"): - run_job(job=job, execute_callable=job_runner._execute) - - dr.refresh_from_db() - - assert dr.state == State.FAILED - - tis = dr.get_task_instances() - for ti in tis: - if ti.task_id in (op1.task_id, op4.task_id, op6.task_id): - assert ti.state == State.SUCCESS - elif ti.task_id == op2.task_id: - assert ti.state == State.FAILED - elif ti.task_id == op3.task_id: - assert ti.state == State.SKIPPED - elif ti.task_id == op5.task_id: - assert ti.state == State.UPSTREAM_FAILED - - def test_update_counters(self, dag_maker, session): - with dag_maker(dag_id="test_manage_executor_state", start_date=DEFAULT_DATE, session=session) as dag: - task1 = EmptyOperator(task_id="dummy", owner="airflow") - dr = dag_maker.create_dagrun(state=None) - job = Job() - job_runner = BackfillJobRunner(job=job, dag=dag) - ti = TI(task1, run_id=dr.run_id) - ti.refresh_from_db() - - ti_status = BackfillJobRunner._DagRunTaskStatus() - - # Test for success - # The in-memory task key in ti_status.running contains a try_number - # that is not in sync with the DB. To test that _update_counters method - # handles this, we mark the task as running in-memory and then increase - # the try number as it would be before the raw task is executed. - # When updating the counters the in-memory key will be used which will - # match what's in the in-memory ti_status.running map. This is the same - # for skipped, failed and retry states. - ti_status.running[ti.key] = ti # Task is queued and marked as running - ti.try_number += 1 - ti.set_state(State.SUCCESS, session) # Task finishes with success state - job_runner._update_counters(ti_status=ti_status, session=session) # Update counters - assert len(ti_status.running) == 0 - assert len(ti_status.succeeded) == 1 - assert len(ti_status.skipped) == 0 - assert len(ti_status.failed) == 0 - assert len(ti_status.to_run) == 0 - - ti_status.succeeded.clear() - - # Test for success when DB try_number is off from in-memory expectations - ti_status.running[ti.key] = ti - ti.try_number += 2 - ti.set_state(State.SUCCESS, session) - job_runner._update_counters(ti_status=ti_status, session=session) - assert len(ti_status.running) == 0 - assert len(ti_status.succeeded) == 1 - assert len(ti_status.skipped) == 0 - assert len(ti_status.failed) == 0 - assert len(ti_status.to_run) == 0 - - ti_status.succeeded.clear() - - # Test for skipped - ti_status.running[ti.key] = ti - ti.try_number += 1 - ti.set_state(State.SKIPPED, session) - job_runner._update_counters(ti_status=ti_status, session=session) - assert len(ti_status.running) == 0 - assert len(ti_status.succeeded) == 0 - assert len(ti_status.skipped) == 1 - assert len(ti_status.failed) == 0 - assert len(ti_status.to_run) == 0 - - ti_status.skipped.clear() - - # Test for failed - ti_status.running[ti.key] = ti - ti.try_number += 1 - ti.set_state(State.FAILED, session) - job_runner._update_counters(ti_status=ti_status, session=session) - assert len(ti_status.running) == 0 - assert len(ti_status.succeeded) == 0 - assert len(ti_status.skipped) == 0 - assert len(ti_status.failed) == 1 - assert len(ti_status.to_run) == 0 - - ti_status.failed.clear() - - # Test for retry - ti_status.running[ti.key] = ti - ti.try_number += 1 - ti.set_state(State.UP_FOR_RETRY, session) - job_runner._update_counters(ti_status=ti_status, session=session) - assert len(ti_status.running) == 0 - assert len(ti_status.succeeded) == 0 - assert len(ti_status.skipped) == 0 - assert len(ti_status.failed) == 0 - assert len(ti_status.to_run) == 1 - - ti_status.to_run.clear() - - # Test for reschedule - # Logic in taskinstance reduces the try number for a task that's been - # rescheduled (which makes sense because it's the _same_ try, but it's - # just being rescheduled to a later time). This now makes the in-memory - # and DB representation of the task try_number the _same_, which is unlike - # the above cases. But this is okay because the in-memory key is used. - ti_status.running[ti.key] = ti # Task queued and marked as running - ti.set_state(State.UP_FOR_RESCHEDULE, session) # Task finishes with reschedule state - job_runner._update_counters(ti_status=ti_status, session=session) - assert len(ti_status.running) == 0 - assert len(ti_status.succeeded) == 0 - assert len(ti_status.skipped) == 0 - assert len(ti_status.failed) == 0 - assert len(ti_status.to_run) == 1 - - ti_status.to_run.clear() - - # test for none - ti.set_state(State.NONE, session) - session.merge(ti) - session.commit() - ti_status.running[ti.key] = ti - job_runner._update_counters(ti_status=ti_status, session=session) - assert len(ti_status.running) == 0 - assert len(ti_status.succeeded) == 0 - assert len(ti_status.skipped) == 0 - assert len(ti_status.failed) == 0 - assert len(ti_status.to_run) == 1 - - ti_status.to_run.clear() - - # test for scheduled - ti.set_state(State.SCHEDULED) - # Deferred tasks are put into scheduled by the triggerer - # Check that they are put into to_run - ti_status.running[ti.key] = ti - job_runner._update_counters(ti_status=ti_status, session=session) - assert len(ti_status.running) == 0 - assert len(ti_status.succeeded) == 0 - assert len(ti_status.skipped) == 0 - assert len(ti_status.failed) == 0 - assert len(ti_status.to_run) == 1 - - ti_status.to_run.clear() - # test for deferred - # if a task is deferred and it's not yet time for the triggerer - # to reschedule it, we should leave it in ti_status.running - ti.set_state(State.DEFERRED) - ti_status.running[ti.key] = ti - job_runner._update_counters(ti_status=ti_status, session=session) - assert len(ti_status.running) == 1 - assert len(ti_status.succeeded) == 0 - assert len(ti_status.skipped) == 0 - assert len(ti_status.failed) == 0 - assert len(ti_status.to_run) == 0 - session.close() - - def test_dag_dagrun_infos_between(self, dag_maker): - with dag_maker( - dag_id="dagrun_infos_between", start_date=DEFAULT_DATE, schedule="@hourly" - ) as test_dag: - EmptyOperator( - task_id="dummy", - owner="airflow", - ) - - assert [DEFAULT_DATE] == [ - info.logical_date - for info in test_dag.iter_dagrun_infos_between( - earliest=DEFAULT_DATE, - latest=DEFAULT_DATE, - ) - ] - assert [ - DEFAULT_DATE - datetime.timedelta(hours=3), - DEFAULT_DATE - datetime.timedelta(hours=2), - DEFAULT_DATE - datetime.timedelta(hours=1), - DEFAULT_DATE, - ] == [ - info.logical_date - for info in test_dag.iter_dagrun_infos_between( - earliest=DEFAULT_DATE - datetime.timedelta(hours=3), - latest=DEFAULT_DATE, - ) - ] - - def test_backfill_run_backwards(self, mock_executor): - dag = self.dagbag.get_dag("test_start_date_scheduling") - dag.clear() - - job = Job() - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE + datetime.timedelta(days=1), - run_backwards=True, - ) - run_job(job=job, execute_callable=job_runner._execute) - - session = settings.Session() - tis = ( - session.query(TI) - .join(TI.dag_run) - .filter(TI.dag_id == "test_start_date_scheduling" and TI.task_id == "dummy") - .order_by(DagRun.execution_date) - .all() - ) - - queued_times = [ti.queued_dttm for ti in tis] - assert queued_times == sorted(queued_times, reverse=True) - assert all(ti.state == State.SUCCESS for ti in tis) - - dag.clear() - session.close() - - def test_reset_orphaned_tasks_with_orphans(self, dag_maker): - """Create dagruns and ensure only ones with correct states are reset.""" - prefix = "backfill_job_test_test_reset_orphaned_tasks" - states = [State.QUEUED, State.SCHEDULED, State.NONE, State.RUNNING, State.SUCCESS] - states_to_reset = [State.QUEUED, State.SCHEDULED, State.NONE] - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} - - tasks = [] - with dag_maker(dag_id=prefix) as dag: - for i in range(len(states)): - task_id = f"{prefix}_task_{i}" - task = EmptyOperator(task_id=task_id) - tasks.append(task) - - session = settings.Session() - job = Job() - job_runner = BackfillJobRunner(job=job, dag=dag) - # create dagruns - dr1 = dag_maker.create_dagrun(run_id=DEFAULT_DAG_RUN_ID, state=State.RUNNING) - dr2 = dag.create_dagrun(run_id="test2", state=State.SUCCESS, **triggered_by_kwargs) - - # create taskinstances and set states - dr1_tis = [] - dr2_tis = [] - for task, state in zip(tasks, states): - ti1 = TI(task, run_id=dr1.run_id) - ti2 = TI(task, run_id=dr2.run_id) - ti1.refresh_from_db() - ti2.refresh_from_db() - ti1.state = state - ti2.state = state - dr1_tis.append(ti1) - dr2_tis.append(ti2) - session.merge(ti1) - session.merge(ti2) - session.commit() - - assert 2 == job_runner.reset_state_for_orphaned_tasks() - - for ti in dr1_tis + dr2_tis: - ti.refresh_from_db() - - # running dagrun should be reset - for state, ti in zip(states, dr1_tis): - if state in states_to_reset: - assert ti.state is None - else: - assert state == ti.state - - # otherwise not - for state, ti in zip(states, dr2_tis): - assert state == ti.state - - for state, ti in zip(states, dr1_tis): - ti.state = state - session.commit() - - job_runner.reset_state_for_orphaned_tasks(filter_by_dag_run=dr1, session=session) - - # check same for dag_run version - for state, ti in zip(states, dr2_tis): - assert state == ti.state - - def test_reset_orphaned_tasks_specified_dagrun(self, session, dag_maker): - """Try to reset when we specify a dagrun and ensure nothing else is.""" - dag_id = "test_reset_orphaned_tasks_specified_dagrun" - task_id = dag_id + "_task" - with dag_maker( - dag_id=dag_id, - start_date=DEFAULT_DATE, - schedule="@daily", - session=session, - ) as dag: - EmptyOperator(task_id=task_id, dag=dag) - - job = Job() - job_runner = BackfillJobRunner(job=job, dag=dag) - # make two dagruns, only reset for one - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} - dr1 = dag_maker.create_dagrun(state=State.SUCCESS, **triggered_by_kwargs) - dr2 = dag.create_dagrun( - run_id="test2", - state=State.RUNNING, - session=session, - **triggered_by_kwargs, - ) - ti1 = dr1.get_task_instances(session=session)[0] - ti2 = dr2.get_task_instances(session=session)[0] - ti1.state = State.SCHEDULED - ti2.state = State.SCHEDULED - - session.merge(ti1) - session.merge(ti2) - session.merge(dr1) - session.merge(dr2) - session.flush() - - num_reset_tis = job_runner.reset_state_for_orphaned_tasks(filter_by_dag_run=dr2, session=session) - assert 1 == num_reset_tis - ti1.refresh_from_db(session=session) - ti2.refresh_from_db(session=session) - assert State.SCHEDULED == ti1.state - assert State.NONE == ti2.state - - def test_job_id_is_assigned_to_dag_run(self, dag_maker, mock_executor): - dag_id = "test_job_id_is_assigned_to_dag_run" - with dag_maker(dag_id=dag_id, start_date=DEFAULT_DATE, schedule="@daily") as dag: - EmptyOperator(task_id="dummy_task", dag=dag) - - job = Job() - job_runner = BackfillJobRunner( - job=job, dag=dag, start_date=timezone.utcnow() - datetime.timedelta(days=1) - ) - run_job(job=job, execute_callable=job_runner._execute) - dr: DagRun = dag.get_last_dagrun() - assert dr.creating_job_id == job.id - - def test_executor_lifecycle(self, dag_maker, mock_executors): - """Ensure that all executors go through the full lifecycle of start, heartbeat, end, etc""" - dag_id = "test_executor_lifecycle" - with dag_maker(dag_id=dag_id, start_date=DEFAULT_DATE, schedule="@daily") as dag: - EmptyOperator(task_id="dummy_task", dag=dag) - - job = Job() - job_runner = BackfillJobRunner( - job=job, dag=dag, start_date=timezone.utcnow() - datetime.timedelta(days=1) - ) - run_job(job=job, execute_callable=job_runner._execute) - - for executor_mock in mock_executors: - assert executor_mock.job_id == job.id - executor_mock.start.assert_called_once() - executor_mock.heartbeat.assert_called_once() - executor_mock.end.assert_called_once() - - def test_non_existing_executor(self, dag_maker, mock_executors): - dag_id = "test_non_existing_executor" - with dag_maker(dag_id=dag_id, start_date=DEFAULT_DATE, schedule="@daily") as dag: - EmptyOperator(task_id="dummy_task", dag=dag, executor="foobar") - - job = Job() - job_runner = BackfillJobRunner( - job=job, dag=dag, start_date=timezone.utcnow() - datetime.timedelta(days=1) - ) - # Executor "foobar" does not exist, so the Backfill job should fail to run those tasks and - # throw an UnknownExecutorException - with pytest.raises(UnknownExecutorException): - run_job(job=job, execute_callable=job_runner._execute) - - def test_hybrid_executors(self, dag_maker, mock_executors, session): - dag_id = "test_hybrid_executors" - with dag_maker(dag_id=dag_id, start_date=DEFAULT_DATE, schedule="@daily") as dag: - EmptyOperator(task_id="default_exec", dag=dag) - EmptyOperator(task_id="default_exec_explicit", dag=dag, executor=mock_executors[0].name.alias) - EmptyOperator(task_id="secondary_exec", dag=dag, executor=mock_executors[1].name.alias) - - job = Job() - job_runner = BackfillJobRunner( - job=job, dag=dag, start_date=timezone.utcnow() - datetime.timedelta(days=1) - ) - - with mock.patch("airflow.executors.executor_loader.ExecutorLoader.lookup_executor_name_by_str"): - run_job(job=job, execute_callable=job_runner._execute) - - dr = DagRun.find(dag_id=dag.dag_id, session=session)[0] - - call_list = mock_executors[0].queue_task_instance.call_args_list - assert len(call_list) == 2 - assert call_list[0].args[0].task_id == "default_exec" - assert call_list[1].args[0].task_id == "default_exec_explicit" - - call_list = mock_executors[1].queue_task_instance.call_args_list - assert len(call_list) == 1 - assert call_list[0].args[0].task_id == "secondary_exec" - - assert dr - assert dr.state == DagRunState.SUCCESS - - # Check that every task has a start and end date - for ti in dr.task_instances: - assert ti.state == TaskInstanceState.SUCCESS - assert ti.start_date is not None - assert ti.end_date is not None - - def test_backfill_has_job_id_int(self, mock_executor): - """Make sure that backfill jobs are assigned job_ids and that the job_id is an int.""" - dag = self.dagbag.get_dag("test_start_date_scheduling") - dag.clear() - - job = Job() - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE + datetime.timedelta(days=1), - run_backwards=True, - ) - run_job(job=job, execute_callable=job_runner._execute) - assert isinstance(job.executor.job_id, int) - - @pytest.mark.long_running - @pytest.mark.parametrize("executor", [SequentialExecutor, DebugExecutor]) - @pytest.mark.parametrize("dag_id", ["test_mapped_classic", "test_mapped_taskflow", "test_sensor"]) - def test_backfilling_dags(self, dag_id, executor, session): - """ - End-to-end test for backfilling dags with various executors. - - We test with multiple executors as they have different "execution environments" -- for instance - DebugExecutor runs a lot more in the same process than other Executors. - - """ - # This test needs a real executor to run, so that the `make_list` task can write out the TaskMap - for _ in _mock_executor(executor): - self.dagbag.process_file(str(TEST_DAGS_FOLDER / f"{dag_id}.py")) - dag = self.dagbag.get_dag(dag_id) - - when = timezone.datetime(2022, 1, 1) - - job = Job() - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=when, - end_date=when, - donot_pickle=True, - ) - run_job(job=job, execute_callable=job_runner._execute) - - dr = DagRun.find(dag_id=dag.dag_id, execution_date=when, session=session)[0] - assert dr - assert dr.state == DagRunState.SUCCESS - - # Check that every task has a start and end date - for ti in dr.task_instances: - assert ti.state == TaskInstanceState.SUCCESS - assert ti.start_date is not None - assert ti.end_date is not None - - def test_mapped_dag_pre_existing_tis(self, dag_maker, session, mock_executor): - """If the DagRun already has some mapped TIs, ensure that we re-run them successfully""" - from airflow.decorators import task - from airflow.operators.python import PythonOperator - - list_result = [[1], [2], [{"a": "b"}]] - - @task - def make_arg_lists(): - return list_result - - def consumer(value): - print(repr(value)) - - with dag_maker(session=session) as dag: - consumer_op = PythonOperator.partial(task_id="consumer", python_callable=consumer).expand( - op_args=make_arg_lists() - ) - PythonOperator.partial(task_id="consumer_literal", python_callable=consumer).expand( - op_args=[[1], [2], [3]], - ) - - dr = dag_maker.create_dagrun() - - # Create the existing mapped TIs -- this the crucial part of this test - ti = dr.get_task_instance("consumer", session=session) - ti.map_index = 0 - for map_index in range(1, 3): - ti = TI(consumer_op, run_id=dr.run_id, map_index=map_index) - session.add(ti) - ti.dag_run = dr - session.flush() - - executor = mock_executor - - ti_status = BackfillJobRunner._DagRunTaskStatus() - ti_status.active_runs.add(dr) - ti_status.to_run = {ti.key: ti for ti in dr.task_instances} - - job = Job() - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=dr.execution_date, - end_date=dr.execution_date, - donot_pickle=True, - ) - - executor_change_state = executor.change_state - - def on_change_state(key, state, info=None): - if key.task_id == "make_arg_lists": - session.add( - TaskMap( - length=len(list_result), - keys=None, - dag_id=key.dag_id, - run_id=key.run_id, - task_id=key.task_id, - map_index=key.map_index, - ) - ) - session.flush() - executor_change_state(key, state, info) - - with patch.object(executor, "change_state", side_effect=on_change_state): - job_runner._process_backfill_task_instances( - ti_status=ti_status, - start_date=dr.execution_date, - pickle_id=None, - session=session, - ) - assert ti_status.failed == set() - assert ti_status.succeeded == { - TaskInstanceKey(dag_id=dr.dag_id, task_id="consumer", run_id="test", try_number=0, map_index=0), - TaskInstanceKey(dag_id=dr.dag_id, task_id="consumer", run_id="test", try_number=0, map_index=1), - TaskInstanceKey(dag_id=dr.dag_id, task_id="consumer", run_id="test", try_number=0, map_index=2), - TaskInstanceKey( - dag_id=dr.dag_id, task_id="consumer_literal", run_id="test", try_number=0, map_index=0 - ), - TaskInstanceKey( - dag_id=dr.dag_id, task_id="consumer_literal", run_id="test", try_number=0, map_index=1 - ), - TaskInstanceKey( - dag_id=dr.dag_id, task_id="consumer_literal", run_id="test", try_number=0, map_index=2 - ), - TaskInstanceKey( - dag_id=dr.dag_id, task_id="make_arg_lists", run_id="test", try_number=0, map_index=-1 - ), - } - - def test_mapped_dag_unexpandable(self, dag_maker, session, mock_executor): - with dag_maker(session=session) as dag: - - @dag.task - def get_things(): - return [1, 2] - - @dag.task - def this_fails() -> None: - raise RuntimeError("sorry!") - - @dag.task(trigger_rule=TriggerRule.ALL_DONE) - def consumer(a, b): - print(a, b) - - consumer.expand(a=get_things(), b=this_fails()) - - when = timezone.datetime(2022, 1, 1) - job = Job() - job_runner = BackfillJobRunner(job=job, dag=dag, start_date=when, end_date=when, donot_pickle=True) - run_job(job=job, execute_callable=job_runner._execute) - (dr,) = DagRun.find(dag_id=dag.dag_id, execution_date=when, session=session) - assert dr.state == DagRunState.FAILED - - # Check that every task has a start and end date - tis = {(ti.task_id, ti.map_index): ti for ti in dr.task_instances} - assert len(tis) == 3 - tis[("get_things", -1)].state == TaskInstanceState.SUCCESS - tis[("this_fails", -1)].state == TaskInstanceState.FAILED - tis[("consumer", -1)].state == TaskInstanceState.UPSTREAM_FAILED - - def test_start_date_set_for_resetted_dagruns(self, dag_maker, session, caplog, mock_executor): - with dag_maker() as dag: - EmptyOperator(task_id="task1") - - dr = dag_maker.create_dagrun() - dr.state = State.SUCCESS - session.merge(dr) - session.flush() - dag.clear() - job = Job() - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE, - donot_pickle=True, - ) - run_job(job=job, execute_callable=job_runner._execute) - - (dr,) = DagRun.find(dag_id=dag.dag_id, execution_date=DEFAULT_DATE, session=session) - assert dr.start_date - assert f"Failed to record duration of {dr}" not in caplog.text - - def test_task_instances_are_not_set_to_scheduled_when_dagrun_reset( - self, dag_maker, session, mock_executor - ): - """Test that when dagrun is reset, task instances are not set to scheduled""" - - with dag_maker() as dag: - task1 = EmptyOperator(task_id="task1") - task2 = EmptyOperator(task_id="task2") - task3 = EmptyOperator(task_id="task3") - task1 >> task2 >> task3 - - for i in range(1, 4): - dag_maker.create_dagrun( - run_id=f"test_dagrun_{i}", execution_date=DEFAULT_DATE + datetime.timedelta(days=i) - ) - - dag.clear() - - job = Job() - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=DEFAULT_DATE + datetime.timedelta(days=1), - end_date=DEFAULT_DATE + datetime.timedelta(days=4), - donot_pickle=True, - ) - for dr in DagRun.find(dag_id=dag.dag_id, session=session): - tasks_to_run = job_runner._task_instances_for_dag_run(dag, dr, session=session) - states = [ti.state for _, ti in tasks_to_run.items()] - assert TaskInstanceState.SCHEDULED in states - assert State.NONE in states - - @pytest.mark.parametrize( - ["disable_retry", "try_number", "exception"], - ( - (True, 1, BackfillUnfinished), - (False, 2, AirflowException), - ), - ) - def test_backfill_disable_retry(self, dag_maker, disable_retry, try_number, exception, mock_executor): - with dag_maker( - dag_id="test_disable_retry", - schedule="@daily", - default_args={ - "retries": 2, - "retry_delay": datetime.timedelta(seconds=3), - }, - ) as dag: - task1 = EmptyOperator(task_id="task1") - dag_run = dag_maker.create_dagrun(state=None) - - executor = mock_executor - executor.parallelism = 16 - executor.mock_task_results[ - TaskInstanceKey(dag.dag_id, task1.task_id, dag_run.run_id, try_number=1) - ] = TaskInstanceState.UP_FOR_RETRY - executor.mock_task_results[ - TaskInstanceKey(dag.dag_id, task1.task_id, dag_run.run_id, try_number=2) - ] = TaskInstanceState.FAILED - - job = Job() - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE, - disable_retry=disable_retry, - ) - with pytest.raises(exception): - run_job(job=job, execute_callable=job_runner._execute) - ti = dag_run.get_task_instance(task_id=task1.task_id) - - assert ti.try_number == try_number - - dag_run.refresh_from_db() - - assert dag_run.state == DagRunState.FAILED - - dag.clear() - - # Qyarantined issue tracked in https://github.com/apache/airflow/issues/39858 - @pytest.mark.quarantined - def test_backfill_failed_dag_with_upstream_failed_task(self, dag_maker): - self.dagbag.process_file(str(TEST_DAGS_FOLDER / "test_backfill_with_upstream_failed_task.py")) - dag = self.dagbag.get_dag("test_backfill_with_upstream_failed_task") - - # We have to use the "fake" version of perform_heartbeat due to the 'is_unit_test' check in - # the original one. However, instead of using the original version of perform_heartbeat, - # we can simply wait for a LocalExecutor's worker cycle. The approach with sleep works well now, - # but it can be replaced with checking the state of the LocalTaskJob. - def fake_perform_heartbeat(*args, **kwargs): - import time - - time.sleep(1) - - with mock.patch("airflow.jobs.backfill_job_runner.perform_heartbeat", fake_perform_heartbeat): - job = Job(executor=ExecutorLoader.load_executor("LocalExecutor")) - job_runner = BackfillJobRunner( - job=job, - dag=dag, - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE, - rerun_failed_tasks=True, - ) - with pytest.raises(BackfillUnfinished): - run_job(job=job, execute_callable=job_runner._execute) - - dr: DagRun = dag.get_last_dagrun() - assert dr.state == State.FAILED diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 78c3acdce0d..18ca7c63320 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -48,7 +48,6 @@ from airflow.executors.base_executor import BaseExecutor from airflow.executors.executor_constants import MOCK_EXECUTOR from airflow.executors.executor_loader import ExecutorLoader -from airflow.jobs.backfill_job_runner import BackfillJobRunner from airflow.jobs.job import Job, run_job from airflow.jobs.local_task_job_runner import LocalTaskJobRunner from airflow.jobs.scheduler_job_runner import SchedulerJobRunner @@ -60,16 +59,16 @@ from airflow.models.db_callback_request import DbCallbackRequest from airflow.models.pool import Pool from airflow.models.serialized_dag import SerializedDagModel -from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance, TaskInstanceKey +from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance from airflow.operators.empty import EmptyOperator from airflow.providers.standard.operators.bash import BashOperator from airflow.serialization.serialized_objects import SerializedDAG +from airflow.timetables.base import DataInterval from airflow.utils import timezone from airflow.utils.file import list_py_file_paths from airflow.utils.session import create_session, provide_session from airflow.utils.state import DagRunState, JobState, State, TaskInstanceState from airflow.utils.types import DagRunType -from tests.jobs.test_backfill_job import _mock_executor from tests.listeners import dag_listener from tests.listeners.test_listeners import get_listener_manager from tests.models import TEST_DAGS_FOLDER @@ -2916,22 +2915,16 @@ def test_scheduler_start_date(self, configs): # because it would take the most recent run and start from there # That behavior still exists, but now it will only do so if after the # start date - bf_exec = MockExecutor() - for _ in _mock_executor(bf_exec): - backfill_job = Job() - job_runner = BackfillJobRunner( - job=backfill_job, dag=dag, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE - ) - run_job(job=backfill_job, execute_callable=job_runner._execute) - - # one task ran + dag.create_dagrun( + state="success", + triggered_by=DagRunTriggeredByType.TIMETABLE, + run_id="abc123", + execution_date=DEFAULT_DATE, + run_type=DagRunType.BACKFILL_JOB, + data_interval=DataInterval(DEFAULT_DATE, DEFAULT_DATE + timedelta(days=1)), + ) + # one task "ran" assert len(session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id).all()) == 1 - assert [ - ( - TaskInstanceKey(dag.dag_id, "dummy", f"backfill__{DEFAULT_DATE.isoformat()}", 1), - (State.SUCCESS, None), - ), - ] == bf_exec.sorted_tasks session.commit() scheduler_job = Job(