Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aiohttp: graceful shutdown doesn't wait for aiojob.asyncio.atomic handler to finish within canceled request. #72

Closed
Dock1100 opened this issue Nov 11, 2018 · 6 comments

Comments

@Dock1100
Copy link

Dock1100 commented Nov 11, 2018

Have some view decorated with aiojob.asyncio.atomic, name it The Handler.
The Handler writes some important data to database in multiple steps.
Some client makes a request to The Handler and cancels it (e.g. due to poor connection).
The Handler is started.
At the same time, we decide to update code on server, so make a graceful shutdown.
The server had gracefully shutdown, but The Handler was not finished, and we lose some data.

Expected behaviour

The graceful shutdown should wait for The Handler to finish, even if it is within the canceled request.

Actual behaviour

The graceful shutdown doesn't wait for The Handler to finish if it is within the canceled request.

Steps to reproduce

Here is the script which reproduces that behavior (run with arg test, like python3 main.py test). It runs multiple scenarios:

  • await_return - await for slow_taks, return the response
  • spawn_wait_return - spawn new job, wait for it, return the response
  • spawn_return - spawn new job without waiting for it, return the response
  • spawn_response_asap_wait - spawn new job, write to response, wait for the job to finish
  • atomic_simple - decorated with atomic, await for slow_task, return the response
  • atomic_response_asap - decorated with atomic, write to response, await for slow_task

For every scenario:

  • create a server on 127.0.0.1:8080, make a request using curl, make graceful shutdown, print outputs.
  • create a server on 127.0.0.1:8080, make a request using curl, cancel request (kill curl), make graceful shutdown, print outputs.

After all, it will print the following table:

scenario                  |                normal                 |          client_early_close          
                          | s. time | c. time | started |   ended | s. time | c. time | started |   ended
             await_return |   6.16s |   5.16s |    True |    True |   1.61s |   0.61s |    True |   False
        spawn_wait_return |   6.17s |   5.16s |    True |    True |   1.61s |   0.60s |    True |   False
             spawn_return |   1.62s |   0.61s |    True |   False |   1.61s |   0.60s |    True |   False
 spawn_response_asap_wait |   1.61s |   0.61s |    True |   False |   1.61s |   0.61s |    True |   False
            atomic_simple |   6.18s |   5.17s |    True |    True |   1.62s |   0.61s |    True |   False
     atomic_response_asap |   1.61s |   0.61s |    True |   False |   1.61s |   0.61s |    True |   False

The most important thing is started which indicates was slow_task started or not, and ended which indicated did slow_task finish or not.
As you can see, in the cases when the client had closed connection, even with atomic handler job was started, but terminated before finished.

import logging
import sys
from datetime import datetime
import subprocess, time, os, signal

from aiohttp import web
from aiohttp.web_response import StreamResponse
from aiojobs.aiohttp import setup, spawn, atomic

import asyncio


logger = logging.getLogger(__name__)

SLOW_TASK_DURATION = 5


async def slow_task():
    logger.info('slow_task started')
    await asyncio.sleep(SLOW_TASK_DURATION)
    logger.info('slow_task ended')


async def await_return(request):
    await slow_task()
    return web.Response(text='It works!!! await_return')


async def spawn_wait_return(request):
    job = await spawn(request, slow_task())
    await job.wait()
    return web.Response(text='It works!!! spawn_wait_return')


async def spawn_return(request):
    await spawn(request, slow_task())
    return web.Response(text='It works!!! spawn_return')


async def spawn_response_asap_wait(request):
    response = StreamResponse()
    job = await spawn(request, slow_task())
    await response.prepare(request)
    await response.write_eof('It works!!! spawn_response_asap_wait'.encode('utf-8'))
    await job.wait()
    return response


@atomic
async def atomic_simple(request):
    await slow_task()
    return web.Response(text='It works!!! atomic_simple')

@atomic
async def atomic_response_asap(request):
    response = StreamResponse()
    await response.prepare(request)
    await response.write_eof('It works!!! atomic_response_asap'.encode('utf-8'))
    await slow_task()
    return response


@web.middleware
async def timeit_middleware(request: web.Request, handler):
    start = datetime.now()
    response = await handler(request)
    end = datetime.now()
    logger.info('Time spend = %s' % (end - start))
    return response


functions = [
    await_return,
    spawn_wait_return,
    spawn_return,
    spawn_response_asap_wait,
    atomic_simple,
    atomic_response_asap,
]


logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s %(module)s %(funcName)s [%(levelname)-5.5s]  %(message)s",
    handlers=[
        logging.StreamHandler()
    ])


def add_padding_to_lines(lines, padding='\t\t\t'):
    return padding + padding.join(lines)


def shortify_timedelta(td):
    mm, ss = divmod(td.seconds, 60)
    hh, mm = divmod(mm, 60)
    s = ''
    if hh > 0:
        s += "%d:" % hh
    if mm > 0 or hh > 0:
        s += "%02d:" % mm
    if mm > 0 or hh > 0:
        s += "%02d" % ss
    else:
        s += "%d" % ss
    if td.microseconds:
        s = s + ".%d" % (td.microseconds/10000.0)  # round 2 digits
    return s + 's'


async def on_cleanup_make_message(app):
    # leave some message, to verify that on_cleanup was called
    print('MY_ON_CLEANUP_MESSAGE')
    logger.info('MY_ON_CLEANUP_MESSAGE')


def create_app():
    app = web.Application(middlewares=[
        timeit_middleware,
    ])
    setup(app, close_timeout=SLOW_TASK_DURATION * 2)
    app.on_cleanup.append(on_cleanup_make_message)
    for func in functions:
        app.router.add_get('/%s' % func.__name__, func)
    return app


if __name__ == '__main__':
    if len(sys.argv) == 1:
        print('Running server')
        app = create_app()
        web.run_app(app, host='127.0.0.1', port=8080)
    else:
        print('Running tests')
        results = {}
        MODES = ('normal', 'client_early_close')
        for func in functions:
            print('%s' % func.__name__)
            results[func.__name__] = {}
            for mode in MODES:
                print('\t%s' % mode)
                server_start_time = datetime.now()
                server_end_time = None
                server = subprocess.Popen(args=['python3', __file__],
                                        stderr=subprocess.STDOUT,
                                        stdout=subprocess.PIPE,
                                        universal_newlines=True)
                time.sleep(1)  # wait for server to startup
                curl_start_time = datetime.now()
                curl_end_time = None
                curl = subprocess.Popen(args=['curl', '127.0.0.1:8080/%s' % func.__name__],
                                        stderr=subprocess.STDOUT,
                                        stdout=subprocess.PIPE,
                                        universal_newlines=True)
                time.sleep(0.5)  # wait for curl to start request

                if mode == MODES[1]:  # early close
                    os.kill(curl.pid, signal.SIGTERM)  # cancel request from client side without awating full response

                os.kill(server.pid, signal.SIGINT)  # send keyboard interrupt to make graceful shutdown
                while server.poll() is None or curl.poll() is None:  # wait for server and curl
                    if server.poll() is not None and server_end_time is None:
                        server_end_time = datetime.now()
                    if curl.poll() is not None and curl_end_time is None:
                        curl_end_time = datetime.now()
                    time.sleep(0.1)

                if server_end_time is None:
                    server_end_time = datetime.now()

                if curl_end_time is None:
                    curl_end_time = datetime.now()
                # print some logs
                server_output = add_padding_to_lines(server.stdout.readlines())
                curl_output = add_padding_to_lines(curl.stdout.readlines())
                print('\t\tServer output:%s' % server_output)
                print('\t\tCurl stdout:%s' % curl_output)
                slow_task_ended = 'slow_task ended' in server_output
                slow_task_started = 'slow_task started' in server_output
                results[func.__name__][mode] = {
                    'server_time': server_end_time - server_start_time,
                    'curl_time': curl_end_time - curl_start_time,
                    'task_started': slow_task_started,
                    'task_ended': slow_task_ended,
                }
            print('\n' * 3)
        print('{:25} | {:^37} | {:^37}'.format('scenario', *MODES))
        print('%25s | %7s | %7s | %7s | %7s | %7s | %7s | %7s | %7s' % ('',
                                                                          's. time', 'c. time', 'started', 'ended',
                                                                          's. time', 'c. time', 'started', 'ended'))
        for func in functions:
            print('%25s' % func.__name__, end='')
            for mode in MODES:
                values = results[func.__name__][mode]
                print(' | %7s | %7s | %7s | %7s' %
                      (shortify_timedelta(values['server_time']), shortify_timedelta(values['curl_time']),
                       values['task_started'], values['task_ended'])
                      , end='')
            print()
        print('*s.time - very rough server time')
        print(' c.time - very rough curl time')

Your environment

OS: macOS Mojave 10.14.2
Python 3.6.5
aiohttp==3.4.4
aiojobs==0.2.2

@Dock1100
Copy link
Author

Dock1100 commented Nov 11, 2018

Looks like found a root of the problem at aiojobs._job.py Job._close:75 (at least after that changes, graceful shutdown waits for tasks to finish)

    async def _close(self, timeout):
        self._closed = True
        if self._task is None:
            # the task is closed immediately without actual execution
            # it prevents a warning like
            # RuntimeWarning: coroutine 'coro' was never awaited
            self._start()
        # changes: comment next 2 lines, and move them to  `except asyncio.TimeoutError as exc`
        # cause this code will cancel the task immediately, but we need to wait at leat for timeout time 
        # if not self._task.done():
        #     self._task.cancel()
        # self._scheduler is None after _done_callback()
        scheduler = self._scheduler
        try:
            with async_timeout.timeout(timeout=timeout,
                                       loop=self._loop):
                await self._task
        except asyncio.CancelledError:
            pass
        except asyncio.TimeoutError as exc:
            # changes: moved 2 lines with task cancelation logic here.
            # It seems that task should be canceled only after timeout
            if not self._task.done():
                self._task.cancel()
            if self._explicit:
                raise
            context = {'message': "Job closing timed out",
                       'job': self,
                       'exception': exc}
            if self._source_traceback is not None:
                context['source_traceback'] = self._source_traceback
            scheduler.call_exception_handler(context)
        except Exception as exc:
            if self._explicit:
                raise
            self._report_exception(exc)

With that modification, if task was started, server will wait for it to finish.

scenario                  |                normal                 |          client_early_close          
                          | s. time | c. time | started |   ended | s. time | c. time | started |   ended
             await_return |   6.27s |    5.6s |    True |    True |   1.72s |   0.61s |    True |   False
        spawn_wait_return |   6.27s |   5.16s |    True |    True |   6.27s |   0.61s |    True |    True
             spawn_return |   6.28s |   0.51s |    True |    True |   6.27s |   0.50s |    True |    True
 spawn_response_asap_wait |   6.27s |   0.50s |    True |    True |   6.29s |   0.53s |    True |    True
            atomic_simple |   6.27s |    5.5s |    True |    True |   6.27s |   0.60s |    True |    True
     atomic_response_asap |   6.28s |   0.51s |    True |    True |   6.27s |   0.51s |    True |    True

@Dreamsorcerer
Copy link
Member

I think a fix would either need some explicit logic for @atomic, or maybe a new Scheduler.wait_and_close() method or similar, which actually waits on the existing jobs with a timeout before initiating a close.

Personally, I'd be more interested in fixing this from the aiohttp side and having aiohttp give handlers a moment to finish processing before shutting everything down.

@djmarsel
Copy link

This issue is 5 years already. The only reason for me to use aiojobs is to give a guarantee of preventing cancellation of a task, including a request from disconnected clients.
But this line:
self._task.cancel()
from _job.py file
makes this impossible. Maybe someone will answer why a task caught by CancelledError should canceled immediately at this line?

@Dreamsorcerer
Copy link
Member

Dreamsorcerer commented Jan 16, 2023

This issue is 5 years already.

And only 1 person cared enough to try and fix it with a PR (but, unfortunately the change was wrong). Feel free to be the one to fix it properly (as mentioned above, a new wait_and_close() method is the one likely to be accepted).

The only reason for me to use aiojobs is to give a guarantee of preventing cancellation of a task, including a request from disconnected clients.

aiojobs provides no gaurantee of preventing cancellation. If you just want to avoid aiohttp's cancellation behaviour on disconnect, it has been removed in the current release, and has a configurable option in the upcoming 3.9 release:
https://docs.aiohttp.org/en/latest/web_advanced.html#peer-disconnection
We do list aiojobs as one method to manage this behaviour in a more fine-grained way, but that only stops cancellation on disconnect, it does not guarantee that the task can't be cancelled any other way.

But this line: self._task.cancel() from _job.py file makes this impossible. Maybe someone will answer why a task caught by CancelledError should canceled immediately at this line?

Not sure what you mean by being caught by CancelledError. That line is in the close() method. i.e. The job is being asked to close, which infers that it should cancel the task.

@Dreamsorcerer
Copy link
Member

I'm fixing this more generally in aiohttp (PR to follow soon). Expected behaviour is that when the server is shutting down, new connections will get rejected, but all existing tasks will have a few seconds to continue running, which will allow ongoing handlers to complete. Once the pending tasks are complete, or the timeout is reached, then the application shutdown/cleanup steps are run and finally any remaining tasks cancelled.

The wait_and_close() method would still be a good idea generally and is tracked in #48.

@Dreamsorcerer
Copy link
Member

aio-libs/aiohttp#7188

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants