Skip to content

Commit

Permalink
pool: update the runahead limit after task removal
Browse files Browse the repository at this point in the history
* Also closes #4983 (by adding a test for it)
  • Loading branch information
oliver-sanders committed Oct 17, 2024
1 parent a7f6f24 commit 6d06c6d
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 3 deletions.
4 changes: 4 additions & 0 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,10 @@ def remove(self, itask, reason=None):

del itask

# removing this task could nudge the runahead limit forward
if self.compute_runahead():
self.release_runahead_tasks()

def get_tasks(self) -> List[TaskProxy]:
"""Return a list of task proxies in the task pool."""
# Cached list only for use internally in this method.
Expand Down
125 changes: 125 additions & 0 deletions tests/integration/test_compat_mode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Tests for Cylc 7 compatibility mode."""

from typing import TYPE_CHECKING

from cylc.flow.cycling.integer import IntegerPoint
from cylc.flow.data_store_mgr import TASK_PROXIES

if TYPE_CHECKING:
from cylc.flow.scheduler import Scheduler


async def test_blocked_tasks_in_n0(flow, scheduler, run, complete):
"""Ensure that tasks with no satisfied dependencies remain in the pool.
In this example, the "recover" task is not satisfiable because its upstream
dependency "foo:failed" will never be satisfied. The unsatisfiable
"recover" task should remain in n=0 until removed/completed.
See https:/cylc/cylc-flow/issues/4983
"""
id_ = flow(
{
'scheduling': {
'initial cycle point': '1',
'cycling mode': 'integer',
'runahead limit': 'P2',
'dependencies': {
'P1': {
'graph': '''
foo:fail => recover
foo | recover => bar
''',
},
},
},
},
filename='suite.rc',
)
schd: 'Scheduler' = scheduler(id_, paused_start=False, debug=True)
async with run(schd):
# the workflow should run for three cycles, then runahead stall
await complete(schd, *(f'{cycle}/bar' for cycle in range(1, 4)))
assert schd.is_stalled

# the "blocked" recover tasks should remain in the pool
assert {t.identity for t in schd.pool.get_tasks()} == {
'1/recover',
'2/recover',
'3/recover',
'4/foo',
}

# the "blocked" tasks should remain visible in the data store
assert {
(x.cycle_point, x.graph_depth, x.name)
for x in schd.data_store_mgr.data[schd.tokens.id][
TASK_PROXIES
].values()
} == {
('1', 1, 'foo'),
('1', 0, 'recover'),
('1', 0, 'bar'), # NOTE: this should be n=2 not n=0
('2', 1, 'foo'),
('2', 0, 'recover'),
('2', 0, 'bar'), # NOTE: this should be n=2 not n=0
('3', 1, 'foo'),
('3', 0, 'recover'),
('3', 0, 'bar'), # NOTE: this should be n=2 not n=0
('4', 0, 'foo'),
('4', 1, 'recover'),
('4', 1, 'bar'),
}

# remove the unsatisfiable tasks
# (i.e. manually implement a suicide trigger)
for cycle in range(1, 4):
itask = schd.pool.get_task(IntegerPoint(str(cycle)), 'recover')
schd.pool.remove(itask, 'suicide-trigger')
assert {t.identity for t in schd.pool.get_tasks()} == {
'4/foo',
'5/foo',
'6/foo',
'7/foo',
}

# the workflow continue into the next three cycles, then stall again
# (i.e. the runahead limit should move forward after the removes)
await complete(schd, *(f'{cycle}/bar' for cycle in range(4, 7)))
assert schd.is_stalled

assert {
(x.cycle_point, x.graph_depth, x.name)
for x in schd.data_store_mgr.data[schd.tokens.id][
TASK_PROXIES
].values()
} == {
('4', 1, 'foo'),
('4', 0, 'recover'),
('4', 0, 'bar'), # NOTE: this should be n=2 not n=0
('5', 1, 'foo'),
('5', 0, 'recover'),
('5', 0, 'bar'), # NOTE: this should be n=2 not n=0
('6', 1, 'foo'),
('6', 0, 'recover'),
('6', 0, 'bar'), # NOTE: this should be n=2 not n=0
('7', 0, 'foo'),
('7', 1, 'recover'),
('7', 1, 'bar'),
}
7 changes: 4 additions & 3 deletions tests/integration/utils/flow_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@
from .flow_writer import flow_config_str


def _make_src_flow(src_path, conf):
def _make_src_flow(src_path, conf, filename=WorkflowFiles.FLOW_FILE):
"""Construct a workflow on the filesystem"""
flow_src_dir = (src_path / str(uuid1()))
flow_src_dir.mkdir(parents=True, exist_ok=True)
if isinstance(conf, dict):
conf = flow_config_str(conf)
with open((flow_src_dir / WorkflowFiles.FLOW_FILE), 'w+') as flow_file:
with open((flow_src_dir / filename), 'w+') as flow_file:
flow_file.write(conf)
return flow_src_dir

Expand All @@ -57,6 +57,7 @@ def _make_flow(
name: Optional[str] = None,
id_: Optional[str] = None,
defaults: Optional[bool] = True,
filename: str = WorkflowFiles.FLOW_FILE,
) -> str:
"""Construct a workflow on the filesystem.
Expand Down Expand Up @@ -86,7 +87,7 @@ def _make_flow(
conf.setdefault('scheduler', {}).setdefault(
'allow implicit tasks', 'True')

with open((flow_run_dir / WorkflowFiles.FLOW_FILE), 'w+') as flow_file:
with open((flow_run_dir / filename), 'w+') as flow_file:
flow_file.write(flow_config_str(conf))
return id_

Expand Down

0 comments on commit 6d06c6d

Please sign in to comment.