diff --git a/changes.d/4983.fix.md b/changes.d/4983.fix.md new file mode 100644 index 00000000000..33c5decbd1b --- /dev/null +++ b/changes.d/4983.fix.md @@ -0,0 +1 @@ +Ensure the runahead limit is recomputed when legacy "suicide-triggers" are used to prevent erroneous stall in niche cases. diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 3ddf991cb9c..c87af417d28 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -877,6 +877,10 @@ def remove(self, itask, reason=None): del itask + # removing this task could nudge the runahead limit forward + if self.compute_runahead(): + self.release_runahead_tasks() + def get_tasks(self) -> List[TaskProxy]: """Return a list of task proxies in the task pool.""" # Cached list only for use internally in this method. diff --git a/tests/integration/test_compat_mode.py b/tests/integration/test_compat_mode.py new file mode 100644 index 00000000000..ca652a2b2d5 --- /dev/null +++ b/tests/integration/test_compat_mode.py @@ -0,0 +1,125 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Tests for Cylc 7 compatibility mode.""" + +from typing import TYPE_CHECKING + +from cylc.flow.cycling.integer import IntegerPoint +from cylc.flow.data_store_mgr import TASK_PROXIES + +if TYPE_CHECKING: + from cylc.flow.scheduler import Scheduler + + +async def test_blocked_tasks_in_n0(flow, scheduler, run, complete): + """Ensure that tasks with no satisfied dependencies remain in the pool. + + In this example, the "recover" task is not satisfiable because its upstream + dependency "foo:failed" will never be satisfied. The unsatisfiable + "recover" task should remain in n=0 until removed/completed. + + See https://github.com/cylc/cylc-flow/issues/4983 + """ + id_ = flow( + { + 'scheduling': { + 'initial cycle point': '1', + 'cycling mode': 'integer', + 'runahead limit': 'P2', + 'dependencies': { + 'P1': { + 'graph': ''' + foo:fail => recover + foo | recover => bar + ''', + }, + }, + }, + }, + filename='suite.rc', + ) + schd: 'Scheduler' = scheduler(id_, paused_start=False, debug=True) + async with run(schd): + # the workflow should run for three cycles, then runahead stall + await complete(schd, *(f'{cycle}/bar' for cycle in range(1, 4))) + assert schd.is_stalled + + # the "blocked" recover tasks should remain in the pool + assert {t.identity for t in schd.pool.get_tasks()} == { + '1/recover', + '2/recover', + '3/recover', + '4/foo', + } + + # the "blocked" tasks should remain visible in the data store + assert { + (x.cycle_point, x.graph_depth, x.name) + for x in schd.data_store_mgr.data[schd.tokens.id][ + TASK_PROXIES + ].values() + } == { + ('1', 1, 'foo'), + ('1', 0, 'recover'), + ('1', 0, 'bar'), # NOTE: this should be n=2 not n=0 + ('2', 1, 'foo'), + ('2', 0, 'recover'), + ('2', 0, 'bar'), # NOTE: this should be n=2 not n=0 + ('3', 1, 'foo'), + ('3', 0, 'recover'), + ('3', 0, 'bar'), # NOTE: this should be n=2 not n=0 + ('4', 0, 'foo'), + ('4', 1, 'recover'), + ('4', 1, 'bar'), + } + + # remove the unsatisfiable tasks + # (i.e. manually implement a suicide trigger) + for cycle in range(1, 4): + itask = schd.pool.get_task(IntegerPoint(str(cycle)), 'recover') + schd.pool.remove(itask, 'suicide-trigger') + assert {t.identity for t in schd.pool.get_tasks()} == { + '4/foo', + '5/foo', + '6/foo', + '7/foo', + } + + # the workflow continue into the next three cycles, then stall again + # (i.e. the runahead limit should move forward after the removes) + await complete(schd, *(f'{cycle}/bar' for cycle in range(4, 7))) + assert schd.is_stalled + + assert { + (x.cycle_point, x.graph_depth, x.name) + for x in schd.data_store_mgr.data[schd.tokens.id][ + TASK_PROXIES + ].values() + } == { + ('4', 1, 'foo'), + ('4', 0, 'recover'), + ('4', 0, 'bar'), # NOTE: this should be n=2 not n=0 + ('5', 1, 'foo'), + ('5', 0, 'recover'), + ('5', 0, 'bar'), # NOTE: this should be n=2 not n=0 + ('6', 1, 'foo'), + ('6', 0, 'recover'), + ('6', 0, 'bar'), # NOTE: this should be n=2 not n=0 + ('7', 0, 'foo'), + ('7', 1, 'recover'), + ('7', 1, 'bar'), + } diff --git a/tests/integration/utils/flow_tools.py b/tests/integration/utils/flow_tools.py index fef15e3e3dc..34b80a25882 100644 --- a/tests/integration/utils/flow_tools.py +++ b/tests/integration/utils/flow_tools.py @@ -39,13 +39,13 @@ from .flow_writer import flow_config_str -def _make_src_flow(src_path, conf): +def _make_src_flow(src_path, conf, filename=WorkflowFiles.FLOW_FILE): """Construct a workflow on the filesystem""" flow_src_dir = (src_path / str(uuid1())) flow_src_dir.mkdir(parents=True, exist_ok=True) if isinstance(conf, dict): conf = flow_config_str(conf) - with open((flow_src_dir / WorkflowFiles.FLOW_FILE), 'w+') as flow_file: + with open((flow_src_dir / filename), 'w+') as flow_file: flow_file.write(conf) return flow_src_dir @@ -57,6 +57,7 @@ def _make_flow( name: Optional[str] = None, id_: Optional[str] = None, defaults: Optional[bool] = True, + filename: str = WorkflowFiles.FLOW_FILE, ) -> str: """Construct a workflow on the filesystem. @@ -86,7 +87,7 @@ def _make_flow( conf.setdefault('scheduler', {}).setdefault( 'allow implicit tasks', 'True') - with open((flow_run_dir / WorkflowFiles.FLOW_FILE), 'w+') as flow_file: + with open((flow_run_dir / filename), 'w+') as flow_file: flow_file.write(flow_config_str(conf)) return id_