Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect if the GridEngine worker thread has crashed to prevent hanging the workflow #4873

Merged
merged 12 commits into from
Apr 26, 2024
56 changes: 36 additions & 20 deletions src/toil/batchSystems/abstractGridEngineBatchSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import queue
stxue1 marked this conversation as resolved.
Show resolved Hide resolved
import time
from abc import ABCMeta, abstractmethod
from datetime import datetime
Expand Down Expand Up @@ -45,20 +46,21 @@ class AbstractGridEngineBatchSystem(BatchSystemCleanupSupport):
A partial implementation of BatchSystemSupport for batch systems run on a
standard HPC cluster. By default auto-deployment is not implemented.
"""
class GridEngineThreadException(Exception):
pass

class Worker(Thread, metaclass=ABCMeta):

class GridEngineThread(Thread, metaclass=ABCMeta):
def __init__(self, newJobsQueue: Queue, updatedJobsQueue: Queue, killQueue: Queue, killedJobsQueue: Queue, boss: 'AbstractGridEngineBatchSystem') -> None:
"""
Abstract worker interface class. All instances are created with five
Abstract thread interface class. All instances are created with five
initial arguments (below). Note the Queue instances passed are empty.

:param newJobsQueue: a Queue of new (unsubmitted) jobs
:param updatedJobsQueue: a Queue of jobs that have been updated
:param killQueue: a Queue of active jobs that need to be killed
:param killedJobsQueue: Queue of killed jobs for this worker
:param killedJobsQueue: Queue of killed jobs for this thread
:param boss: the AbstractGridEngineBatchSystem instance that
controls this AbstractGridEngineWorker
controls this GridEngineThread

"""
Thread.__init__(self)
Expand All @@ -77,6 +79,7 @@ def __init__(self, newJobsQueue: Queue, updatedJobsQueue: Queue, killQueue: Queu
self.batchJobIDs: Dict[int, str] = dict()
self._checkOnJobsCache = None
self._checkOnJobsTimestamp = None
self.exception = None

def getBatchSystemID(self, jobID: int) -> str:
"""
Expand Down Expand Up @@ -110,7 +113,7 @@ def createJobs(self, newJob: JobTuple) -> bool:
"""
Create a new job with the given attributes.

Implementation-specific; called by AbstractGridEngineWorker.run()
Implementation-specific; called by GridEngineThread.run()
"""
activity = False
# Load new job id if present:
Expand Down Expand Up @@ -146,7 +149,7 @@ def createJobs(self, newJob: JobTuple) -> bool:

def killJobs(self):
"""
Kill any running jobs within worker
Kill any running jobs within thread
"""
killList = list()
while True:
Expand Down Expand Up @@ -277,14 +280,17 @@ def run(self):
while self._runStep():
pass
except Exception as ex:
logger.error("GridEngine like batch system failure", exc_info=ex)
raise
self.exception = ex
logger.error("GridEngine like batch system failure: %s", ex)
# dont raise exception as is_alive will still be set to false,
stxue1 marked this conversation as resolved.
Show resolved Hide resolved
# signalling exception in the thread as we expect the thread to
# always be running for the duration of the workflow

def coalesce_job_exit_codes(self, batch_job_id_list: list) -> List[Union[int, Tuple[int, Optional[BatchJobExitReason]], None]]:
"""
Returns exit codes and possibly exit reasons for a list of jobs, or None if they are running.

Called by AbstractGridEngineWorker.checkOnJobs().
Called by GridEngineThread.checkOnJobs().

This is an optional part of the interface. It should raise
NotImplementedError if not actually implemented for a particular
Expand Down Expand Up @@ -345,7 +351,7 @@ def getRunningJobIDs(self):
def killJob(self, jobID):
"""
Kill specific job with the Toil job ID. Implementation-specific; called
by AbstractGridEngineWorker.killJobs()
by GridEngineThread.killJobs()

:param string jobID: Toil job ID
"""
Expand All @@ -360,7 +366,7 @@ def getJobExitCode(self, batchJobID) -> Union[int, Tuple[int, Optional[BatchJobE

If the job is not running but the exit code is not available, it
will be EXIT_STATUS_UNAVAILABLE_VALUE. Implementation-specific;
called by AbstractGridEngineWorker.checkOnJobs().
called by GridEngineThread.checkOnJobs().

The exit code will only be 0 if the job affirmatively succeeded.

Expand All @@ -379,10 +385,10 @@ def __init__(self, config, maxCores, maxMemory, maxDisk):
self.updatedJobsQueue = Queue()
self.killQueue = Queue()
self.killedJobsQueue = Queue()
# get the associated worker class here
self.worker = self.Worker(self.newJobsQueue, self.updatedJobsQueue,
self.killQueue, self.killedJobsQueue, self)
self.worker.start()
# get the associated thread class here
self.background_thread = self.GridEngineThread(self.newJobsQueue, self.updatedJobsQueue,
self.killQueue, self.killedJobsQueue, self)
self.background_thread.start()
self._getRunningBatchJobIDsTimestamp = None
self._getRunningBatchJobIDsCache = {}

Expand Down Expand Up @@ -428,7 +434,12 @@ def killBatchJobs(self, jobIDs):
for jobID in jobIDs:
self.killQueue.put(jobID)
while jobIDs:
killedJobId = self.killedJobsQueue.get()
try:
killedJobId = self.killedJobsQueue.get(timeout=10)
except queue.Empty:
stxue1 marked this conversation as resolved.
Show resolved Hide resolved
if not self.background_thread.is_alive():
raise self.GridEngineThreadException("Grid engine thread failed unexpectedly") from self.background_thread.exception
continue
if killedJobId is None:
break
jobIDs.remove(killedJobId)
Expand Down Expand Up @@ -460,14 +471,19 @@ def getRunningBatchJobIDs(self):
self.config.statePollingWait):
batchIds = self._getRunningBatchJobIDsCache
else:
batchIds = self.with_retries(self.worker.getRunningJobIDs)
batchIds = self.with_retries(self.background_thread.getRunningJobIDs)
self._getRunningBatchJobIDsCache = batchIds
self._getRunningBatchJobIDsTimestamp = datetime.now()
batchIds.update(self.getRunningLocalJobIDs())
return batchIds

def getUpdatedBatchJob(self, maxWait):
local_tuple = self.getUpdatedLocalJob(0)

if not self.background_thread.is_alive():
# kill remaining jobs on the thread
self.background_thread.killJobs()
raise self.GridEngineThreadException("Unexpected GridEngineThread failure") from self.background_thread.exception
if local_tuple:
return local_tuple
else:
Expand All @@ -481,14 +497,14 @@ def getUpdatedBatchJob(self, maxWait):

def shutdown(self) -> None:
"""
Signals worker to shutdown (via sentinel) then cleanly joins the thread
Signals thread to shutdown (via sentinel) then cleanly joins the thread
"""
self.shutdownLocal()
newJobsQueue = self.newJobsQueue
self.newJobsQueue = None

newJobsQueue.put(None)
self.worker.join()
self.background_thread.join()

def setEnv(self, name, value=None):
if value and ',' in value:
Expand Down
2 changes: 1 addition & 1 deletion src/toil/batchSystems/gridengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

class GridEngineBatchSystem(AbstractGridEngineBatchSystem):

class Worker(AbstractGridEngineBatchSystem.Worker):
class GridEngineThread(AbstractGridEngineBatchSystem.GridEngineThread):
"""
Grid Engine-specific AbstractGridEngineWorker methods
"""
Expand Down
2 changes: 1 addition & 1 deletion src/toil/batchSystems/htcondor.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
class HTCondorBatchSystem(AbstractGridEngineBatchSystem):
# When using HTCondor, the Schedd handles scheduling

class Worker(AbstractGridEngineBatchSystem.Worker):
class GridEngineThread(AbstractGridEngineBatchSystem.GridEngineThread):

# Override the createJobs method so that we can use htcondor.Submit objects
# and so that we can get disk allocation requests and ceil the CPU request.
Expand Down
4 changes: 2 additions & 2 deletions src/toil/batchSystems/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@

class LSFBatchSystem(AbstractGridEngineBatchSystem):

class Worker(AbstractGridEngineBatchSystem.Worker):
"""LSF specific AbstractGridEngineWorker methods."""
class GridEngineThread(AbstractGridEngineBatchSystem.GridEngineThread):
"""LSF specific GridEngineThread methods."""

def getRunningJobIDs(self):
times = {}
Expand Down
4 changes: 2 additions & 2 deletions src/toil/batchSystems/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@

class SlurmBatchSystem(AbstractGridEngineBatchSystem):

class Worker(AbstractGridEngineBatchSystem.Worker):
class GridEngineThread(AbstractGridEngineBatchSystem.GridEngineThread):

def getRunningJobIDs(self):
# Should return a dictionary of Job IDs and number of seconds
Expand Down Expand Up @@ -135,7 +135,7 @@ def submitJob(self, subLine):
logger.debug("sbatch submitted job %d", result)
return result
except OSError as e:
logger.error("sbatch command failed")
logger.error(f"sbatch command failed with error: {e}")
raise e

def coalesce_job_exit_codes(self, batch_job_id_list: list) -> List[Union[int, Tuple[int, Optional[BatchJobExitReason]], None]]:
Expand Down
2 changes: 1 addition & 1 deletion src/toil/batchSystems/torque.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
class TorqueBatchSystem(AbstractGridEngineBatchSystem):

# class-specific Worker
class Worker(AbstractGridEngineBatchSystem.Worker):
class GridEngineThread(AbstractGridEngineBatchSystem.GridEngineThread):
def __init__(
self, newJobsQueue, updatedJobsQueue, killQueue, killedJobsQueue, boss
):
Expand Down
1 change: 1 addition & 0 deletions src/toil/lib/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,7 @@ def old_retry(
>>> i
1
"""
timeout = timeout if timeout else DEFAULT_TIMEOUT
stxue1 marked this conversation as resolved.
Show resolved Hide resolved
if timeout > 0:
go = [ None ]

Expand Down
2 changes: 1 addition & 1 deletion src/toil/test/batchSystems/test_slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ class SlurmTest(ToilTest):

def setUp(self):
self.monkeypatch = pytest.MonkeyPatch()
self.worker = toil.batchSystems.slurm.SlurmBatchSystem.Worker(
self.worker = toil.batchSystems.slurm.SlurmBatchSystem.GridEngineThread(
newJobsQueue=Queue(),
updatedJobsQueue=Queue(),
killQueue=Queue(),
Expand Down