Skip to content

Commit

Permalink
Address #54 and improve interrupted shutdown (#56)
Browse files Browse the repository at this point in the history
* Initial rough sketch of the solution (includes debug messages)

The patch:
  - continuously polls all subprocesses until they terminate (within some time
    limit)
  - kills any stragglers

* Cleaned up messages. Prevent multiple CTRL-C

* Addressed comments: change pre_kill to clean and reworded messages

* Revert behavior of multipr CTRL-C (kills things more quickly)

* Minor stylistic changes to address comments

* Addressed more stylistic concerns
  • Loading branch information
romain-intel authored and savingoyal committed Dec 16, 2019
1 parent 80e4546 commit f6dd413
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 12 deletions.
2 changes: 1 addition & 1 deletion metaflow/plugins/aws/batch/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def __init__(self, metadata, environment):
self.metadata = metadata
self.environment = environment
self._client = BatchClient()
atexit.register(lambda : self.job.kill() if hasattr(self, 'job') else None)
atexit.register(lambda: self.job.kill() if hasattr(self, 'job') else None)

def _command(self, code_package_url, environment, step_name, step_cli):
cmds = environment.get_package_commands(code_package_url)
Expand Down
4 changes: 1 addition & 3 deletions metaflow/plugins/aws/batch/batch_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,6 @@ def get_log_stream(job):
else:
yield line
except Exception as ex:
last_exc = ex
if self.is_crashed:
break
sys.stderr.write(repr(ex))
Expand All @@ -308,8 +307,7 @@ def get_log_stream(job):
def kill(self):
if not self.is_done:
self._client.terminate_job(
jobId=self._id, reason='Metaflow initiated job termination.'
)
jobId=self._id, reason='Metaflow initiated job termination.')
return self.update()


Expand Down
42 changes: 34 additions & 8 deletions metaflow/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,23 @@ def execute(self):
'by the end of flow.')

def _killall(self):
for worker in self._workers.values():
worker.kill()
# If we are here, all children have received a signal and are shutting down.
# We want to give them an opportunity to do so and then kill
live_workers = set(self._workers.values())
now = int(time.time())
self._logger('Terminating %d active tasks...' % len(live_workers),
system_msg=True, bad=True)
while live_workers and int(time.time()) - now < 5:
# While not all workers are dead and we have waited less than 5 seconds
live_workers = [worker for worker in live_workers if not worker.clean()]
if live_workers:
self._logger('Killing %d remaining tasks after having waited for %d seconds -- '
'some tasks may not exit cleanly' % (len(live_workers),
int(time.time()) - now),
system_msg=True, bad=True)
for worker in live_workers:
worker.kill()
self._logger('Flushing logs...', system_msg=True, bad=True)
# give killed workers a chance to flush their logs to datastore
for _ in range(3):
list(self._poll_workers())
Expand Down Expand Up @@ -381,7 +396,7 @@ def _poll_workers(self):
task = worker.task
if returncode:
# worker did not finish successfully
if worker.killed or\
if worker.cleaned or \
returncode == METAFLOW_EXIT_DISALLOW_RETRY:
self._logger("This failed task will not be "
"retried.", system_msg=True)
Expand Down Expand Up @@ -769,7 +784,12 @@ def __init__(self, task, max_logs_size):
self._stdout)}

self._encoding = sys.stdout.encoding or 'UTF-8'
self.killed = False
self.killed = False # Killed indicates that the task was forcibly killed
# with SIGKILL by the master process.
# A killed task is always considered cleaned
self.cleaned = False # A cleaned task is one that is shutting down and has been
# noticed by the runtime and queried for its state (whether or
# not is is properly shut down)

def _launch(self):
args = CLIArgs(self.task)
Expand Down Expand Up @@ -826,16 +846,22 @@ def fds(self):
return (self._proc.stderr.fileno(),
self._proc.stdout.fileno())

def kill(self):
if not self.killed:
def clean(self):
if self.killed:
return True
if not self.cleaned:
for fileobj, buf in self._logs.values():
buf.write(b'[KILLED BY ORCHESTRATOR]\n', system_msg=True)
self.cleaned = True
return self._proc.poll() is not None

def kill(self):
if not self.killed:
try:
# wait for the process to clean up after itself
select.poll().poll(1000)
self._proc.kill()
except:
pass
self.cleaned = True
self.killed = True

def terminate(self):
Expand Down

0 comments on commit f6dd413

Please sign in to comment.