Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pool: update the runahead limit after task removal #6428

Open
wants to merge 1 commit into
base: 8.3.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/4983.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Ensure the runahead limit is recomputed when legacy "suicide-triggers" are used to prevent erroneous stall in niche cases.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Ensure the runahead limit is recomputed when legacy "suicide-triggers" are used to prevent erroneous stall in niche cases.
Ensure the runahead limit is recomputed when legacy "suicide-triggers" are used, to prevent erroneous stall in niche cases.

(A bit ambiguous without the comma)

4 changes: 4 additions & 0 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,10 @@ def remove(self, itask, reason=None):

del itask

# removing this task could nudge the runahead limit forward
if self.compute_runahead():
self.release_runahead_tasks()

def get_tasks(self) -> List[TaskProxy]:
"""Return a list of task proxies in the task pool."""
# Cached list only for use internally in this method.
Expand Down
125 changes: 125 additions & 0 deletions tests/integration/test_compat_mode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Tests for Cylc 7 compatibility mode."""

from typing import TYPE_CHECKING

from cylc.flow.cycling.integer import IntegerPoint
from cylc.flow.data_store_mgr import TASK_PROXIES

if TYPE_CHECKING:
from cylc.flow.scheduler import Scheduler


async def test_blocked_tasks_in_n0(flow, scheduler, run, complete):
"""Ensure that tasks with no satisfied dependencies remain in the pool.

In this example, the "recover" task is not satisfiable because its upstream
dependency "foo:failed" will never be satisfied. The unsatisfiable
"recover" task should remain in n=0 until removed/completed.

See https:/cylc/cylc-flow/issues/4983
"""
id_ = flow(
{
'scheduling': {
'initial cycle point': '1',
'cycling mode': 'integer',
'runahead limit': 'P2',
'dependencies': {
'P1': {
'graph': '''
foo:fail => recover
foo | recover => bar
''',
},
},
},
},
filename='suite.rc',
)
schd: 'Scheduler' = scheduler(id_, paused_start=False, debug=True)
async with run(schd):
# the workflow should run for three cycles, then runahead stall
await complete(schd, *(f'{cycle}/bar' for cycle in range(1, 4)))
assert schd.is_stalled

# the "blocked" recover tasks should remain in the pool
assert {t.identity for t in schd.pool.get_tasks()} == {
'1/recover',
'2/recover',
'3/recover',
'4/foo',
}

# the "blocked" tasks should remain visible in the data store
assert {
(x.cycle_point, x.graph_depth, x.name)
for x in schd.data_store_mgr.data[schd.tokens.id][
TASK_PROXIES
].values()
} == {
('1', 1, 'foo'),
('1', 0, 'recover'),
('1', 0, 'bar'), # NOTE: this should be n=2 not n=0
('2', 1, 'foo'),
('2', 0, 'recover'),
('2', 0, 'bar'), # NOTE: this should be n=2 not n=0
('3', 1, 'foo'),
('3', 0, 'recover'),
('3', 0, 'bar'), # NOTE: this should be n=2 not n=0
('4', 0, 'foo'),
('4', 1, 'recover'),
('4', 1, 'bar'),
}

# remove the unsatisfiable tasks
# (i.e. manually implement a suicide trigger)
for cycle in range(1, 4):
itask = schd.pool.get_task(IntegerPoint(str(cycle)), 'recover')
schd.pool.remove(itask, 'suicide-trigger')
assert {t.identity for t in schd.pool.get_tasks()} == {
'4/foo',
'5/foo',
'6/foo',
'7/foo',
}

# the workflow continue into the next three cycles, then stall again
# (i.e. the runahead limit should move forward after the removes)
await complete(schd, *(f'{cycle}/bar' for cycle in range(4, 7)))
assert schd.is_stalled

assert {
(x.cycle_point, x.graph_depth, x.name)
for x in schd.data_store_mgr.data[schd.tokens.id][
TASK_PROXIES
].values()
} == {
('4', 1, 'foo'),
('4', 0, 'recover'),
('4', 0, 'bar'), # NOTE: this should be n=2 not n=0
('5', 1, 'foo'),
('5', 0, 'recover'),
('5', 0, 'bar'), # NOTE: this should be n=2 not n=0
('6', 1, 'foo'),
('6', 0, 'recover'),
('6', 0, 'bar'), # NOTE: this should be n=2 not n=0
('7', 0, 'foo'),
('7', 1, 'recover'),
('7', 1, 'bar'),
}
7 changes: 4 additions & 3 deletions tests/integration/utils/flow_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@
from .flow_writer import flow_config_str


def _make_src_flow(src_path, conf):
def _make_src_flow(src_path, conf, filename=WorkflowFiles.FLOW_FILE):
"""Construct a workflow on the filesystem"""
flow_src_dir = (src_path / str(uuid1()))
flow_src_dir.mkdir(parents=True, exist_ok=True)
if isinstance(conf, dict):
conf = flow_config_str(conf)
with open((flow_src_dir / WorkflowFiles.FLOW_FILE), 'w+') as flow_file:
with open((flow_src_dir / filename), 'w+') as flow_file:
flow_file.write(conf)
return flow_src_dir

Expand All @@ -57,6 +57,7 @@ def _make_flow(
name: Optional[str] = None,
id_: Optional[str] = None,
defaults: Optional[bool] = True,
filename: str = WorkflowFiles.FLOW_FILE,
) -> str:
"""Construct a workflow on the filesystem.

Expand Down Expand Up @@ -86,7 +87,7 @@ def _make_flow(
conf.setdefault('scheduler', {}).setdefault(
'allow implicit tasks', 'True')

with open((flow_run_dir / WorkflowFiles.FLOW_FILE), 'w+') as flow_file:
with open((flow_run_dir / filename), 'w+') as flow_file:
flow_file.write(flow_config_str(conf))
return id_

Expand Down
Loading