Skip to content

Commit

Permalink
Retry Slurm interactions more (#4869)
Browse files Browse the repository at this point in the history
* Hook up grid engine batch systems to the normal retry system and add --stastePollingTimeout

* Remove extra word

* Insist on understanding the Slurm states and stop if we don't

* Change how we think of REVOKED and SPECIAL_EXIT

* Add missing argument

* Import missing exception type

---------

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
adamnovak and github-actions[bot] authored Apr 17, 2024
1 parent b27be0f commit 78fe93d
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 36 deletions.
4 changes: 4 additions & 0 deletions docs/running/cliOptions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,10 @@ levels in toil are based on priority from the logging module:
the waiting period. Only works for grid engine batch
systems such as gridengine, htcondor, torque, slurm,
and lsf.
--statePollingTimeout STATEPOLLINGTIMEOUT
Time, in seconds, to retry against a broken scheduler.
Only works for grid engine batch systems such as
gridengine, htcondor, torque, slurm, and lsf.
--batchLogsDir BATCHLOGSDIR
Directory to tell the backing batch system to log into.
Should be available on both the leader and the workers,
Expand Down
35 changes: 19 additions & 16 deletions src/toil/batchSystems/abstractGridEngineBatchSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from toil.bus import ExternalBatchIdMessage, get_job_kind
from toil.job import AcceleratorRequirement
from toil.lib.misc import CalledProcessErrorStderr
from toil.lib.retry import old_retry, DEFAULT_DELAYS

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -64,6 +65,8 @@ def __init__(self, newJobsQueue: Queue, updatedJobsQueue: Queue, killQueue: Queu
self.boss = boss
self.boss.config.statePollingWait = \
self.boss.config.statePollingWait or self.boss.getWaitDuration()
self.boss.config.state_polling_timeout = \
self.boss.config.state_polling_timeout or self.boss.config.statePollingWait * 10
self.newJobsQueue = newJobsQueue
self.updatedJobsQueue = updatedJobsQueue
self.killQueue = killQueue
Expand Down Expand Up @@ -175,7 +178,8 @@ def killJobs(self):
while killList:
for jobID in list(killList):
batchJobID = self.getBatchSystemID(jobID)
if self.boss.with_retries(self.getJobExitCode, batchJobID) is not None:
exit_code = self.boss.with_retries(self.getJobExitCode, batchJobID)
if exit_code is not None:
logger.debug('Adding jobID %s to killedJobsQueue', jobID)
self.killedJobsQueue.put(jobID)
killList.remove(jobID)
Expand Down Expand Up @@ -503,21 +507,20 @@ def sleepSeconds(self, sleeptime=1):

def with_retries(self, operation, *args, **kwargs):
"""
Call operation with args and kwargs. If one of the calls to an SGE
command fails, sleep and try again for a set number of times.
Call operation with args and kwargs. If one of the calls to a
command fails, sleep and try again.
"""
maxTries = 3
tries = 0
while True:
tries += 1
try:
return operation(*args, **kwargs)
except CalledProcessErrorStderr as err:
if tries < maxTries:
logger.error("Will retry errored operation %s, code %d: %s",
operation.__name__, err.returncode, err.stderr)
time.sleep(self.config.statePollingWait)
else:
logger.error("Failed operation %s, code %d: %s",
for attempt in old_retry(
# Don't retry more often than the state polling wait.
delays=[max(delay, self.config.statePollingWait) for delay in DEFAULT_DELAYS],
timeout=self.config.state_polling_timeout,
predicate=lambda e: isinstance(e, CalledProcessErrorStderr)
):
with attempt:
try:
return operation(*args, **kwargs)
except CalledProcessErrorStderr as err:
logger.error("Errored operation %s, code %d: %s",
operation.__name__, err.returncode, err.stderr)
# Raise up to the retry logic, which will retry until timeout
raise err
9 changes: 9 additions & 0 deletions src/toil/batchSystems/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def set_batchsystem_options(batch_system: Optional[str], set_option: OptionSette
set_option("manualMemArgs")
set_option("run_local_jobs_on_workers")
set_option("statePollingWait")
set_option("state_polling_timeout")
set_option("batch_logs_dir")


Expand Down Expand Up @@ -164,6 +165,14 @@ def add_all_batchsystem_options(parser: Union[ArgumentParser, _ArgumentGroup]) -
"Return cached results if within the waiting period. Only works for grid "
"engine batch systems such as gridengine, htcondor, torque, slurm, and lsf."
)
parser.add_argument(
"--statePollingTimeout",
dest="state_polling_timeout",
type=int,
default=1200,
help="Time, in seconds, to retry against a broken scheduler. Only works for grid "
"engine batch systems such as gridengine, htcondor, torque, slurm, and lsf."
)
parser.add_argument(
"--batchLogsDir",
dest="batch_logs_dir",
Expand Down
82 changes: 62 additions & 20 deletions src/toil/batchSystems/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
import os
from argparse import ArgumentParser, _ArgumentGroup
from shlex import quote
from typing import Dict, List, Optional, Tuple, TypeVar, Union
from typing import Dict, List, Optional, Set, Tuple, TypeVar, Union

from toil.batchSystems.abstractBatchSystem import BatchJobExitReason, EXIT_STATUS_UNAVAILABLE_VALUE
from toil.batchSystems.abstractBatchSystem import BatchJobExitReason, EXIT_STATUS_UNAVAILABLE_VALUE, InsufficientSystemResources
from toil.batchSystems.abstractGridEngineBatchSystem import \
AbstractGridEngineBatchSystem
from toil.batchSystems.options import OptionSetter
Expand All @@ -27,6 +27,46 @@

logger = logging.getLogger(__name__)

# We have a complete list of Slurm states. States not in one of these aren't
# allowed. See <https://slurm.schedmd.com/squeue.html#SECTION_JOB-STATE-CODES>

# If a job is in one of these states, Slurm can't run it anymore.
# We don't include states where the job is held or paused here;
# those mean it could run and needs to wait for someone to un-hold
# it, so Toil should wait for it.
#
# We map from each terminal state to the Toil-ontology exit reason.
TERMINAL_STATES: Dict[str, BatchJobExitReason] = {
"BOOT_FAIL": BatchJobExitReason.LOST,
"CANCELLED": BatchJobExitReason.KILLED,
"COMPLETED": BatchJobExitReason.FINISHED,
"DEADLINE": BatchJobExitReason.KILLED,
"FAILED": BatchJobExitReason.FAILED,
"NODE_FAIL": BatchJobExitReason.LOST,
"OUT_OF_MEMORY": BatchJobExitReason.MEMLIMIT,
"PREEMPTED": BatchJobExitReason.KILLED,
"REVOKED": BatchJobExitReason.KILLED,
"SPECIAL_EXIT": BatchJobExitReason.FAILED,
"TIMEOUT": BatchJobExitReason.KILLED
}

# If a job is in one of these states, it might eventually move to a different
# state.
NONTERMINAL_STATES: Set[str] = {
"CONFIGURING",
"COMPLETING",
"PENDING",
"RUNNING",
"RESV_DEL_HOLD",
"REQUEUE_FED",
"REQUEUE_HOLD",
"REQUEUED",
"RESIZING",
"SIGNALING",
"STAGE_OUT",
"STOPPED",
"SUSPENDED"
}

class SlurmBatchSystem(AbstractGridEngineBatchSystem):

Expand Down Expand Up @@ -165,24 +205,6 @@ def _get_job_return_code(self, status: tuple) -> Union[int, Tuple[int, Optional[
"""
state, rc = status

# If a job is in one of these states, Slurm can't run it anymore.
# We don't include states where the job is held or paused here;
# those mean it could run and needs to wait for someone to un-hold
# it, so Toil should wait for it.
#
# We map from each terminal state to the Toil-ontology exit reason.
TERMINAL_STATES: Dict[str, BatchJobExitReason] = {
"BOOT_FAIL": BatchJobExitReason.LOST,
"CANCELLED": BatchJobExitReason.KILLED,
"COMPLETED": BatchJobExitReason.FINISHED,
"DEADLINE": BatchJobExitReason.KILLED,
"FAILED": BatchJobExitReason.FAILED,
"NODE_FAIL": BatchJobExitReason.LOST,
"OUT_OF_MEMORY": BatchJobExitReason.MEMLIMIT,
"PREEMPTED": BatchJobExitReason.KILLED,
"TIMEOUT": BatchJobExitReason.KILLED
}

if state not in TERMINAL_STATES:
# Don't treat the job as exited yet
return None
Expand All @@ -204,6 +226,24 @@ def _get_job_return_code(self, status: tuple) -> Union[int, Tuple[int, Optional[
# If the code is nonzero, pass it along.
return (rc, exit_reason)

def _canonicalize_state(self, state: str) -> str:
"""
Turn a state string form SLURM into just the state token like "CANCELED".
"""

# Slurm will sometimes send something like "CANCELED by 30065" in
# the state column for some reason.

state_token = state

if " " in state_token:
state_token = state.split(" ", 1)[0]

if state_token not in TERMINAL_STATES and state_token not in NONTERMINAL_STATES:
raise RuntimeError("Toil job in unimplemented Slurm state " + state)

return state_token

def _getJobDetailsFromSacct(self, job_id_list: list) -> dict:
"""
Get SLURM job exit codes for the jobs in `job_id_list` by running `sacct`.
Expand Down Expand Up @@ -231,6 +271,7 @@ def _getJobDetailsFromSacct(self, job_id_list: list) -> dict:
if len(values) < 3:
continue
job_id_raw, state, exitcode = values
state = self._canonicalize_state(state)
logger.debug("%s state of job %s is %s", args[0], job_id_raw, state)
# JobIDRaw is in the form JobID[.JobStep]; we're not interested in job steps.
job_id_parts = job_id_raw.split(".")
Expand Down Expand Up @@ -305,6 +346,7 @@ def _getJobDetailsFromScontrol(self, job_id_list: list) -> dict:
if job_id not in job_id_list:
continue
state = job['JobState']
state = self._canonicalize_state(state)
logger.debug("%s state of job %s is %s", args[0], job_id, state)
try:
exitcode = job['ExitCode']
Expand Down
1 change: 1 addition & 0 deletions src/toil/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ class Config:
"""The backing scheduler will be instructed, if possible, to save logs
to this directory, where the leader can read them."""
statePollingWait: int
state_polling_timeout: int
disableAutoDeployment: bool

# Core options
Expand Down

0 comments on commit 78fe93d

Please sign in to comment.