Skip to content

Commit

Permalink
Disable keepalives for websockets (#2701)
Browse files Browse the repository at this point in the history
* Get rid of yield_fixture

* Don't reschedule keepalive processor immediatelly to prevent 100% CPU load

* Fix #1955: Fix 100% CPU usage on HTTP GET and websocket connection just after it
  • Loading branch information
asvetlov authored Feb 2, 2018
1 parent fdd9aab commit ad2366e
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 9 deletions.
6 changes: 6 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ Changelog

.. towncrier release notes start
2.3.10 (XXXX-XX-XX)
===================

- Fix 100% CPU usage on HTTP GET and websocket connection just after it (#1955)

2.3.9 (2018-01-16)
==================

Expand Down
12 changes: 9 additions & 3 deletions aiohttp/web_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class RequestHandler(asyncio.streams.FlowControlMixin, asyncio.Protocol):
"""
_request_count = 0
_keepalive = False # keep transport open
KEEPALIVE_RESCHEDULE_DELAY = 1

def __init__(self, manager, *, loop=None,
keepalive_timeout=75, # NGINX default value is 75 secs
Expand Down Expand Up @@ -321,6 +322,9 @@ def keep_alive(self, val):
:param bool val: new state.
"""
self._keepalive = val
if self._keepalive_handle:
self._keepalive_handle.cancel()
self._keepalive_handle = None

def close(self):
"""Stop accepting new pipelinig messages and close
Expand Down Expand Up @@ -352,7 +356,7 @@ def log_exception(self, *args, **kw):
self.logger.exception(*args, **kw)

def _process_keepalive(self):
if self._force_close:
if self._force_close or not self._keepalive:
return

next = self._keepalive_time + self._keepalive_timeout
Expand All @@ -363,8 +367,10 @@ def _process_keepalive(self):
self.force_close(send_last_heartbeat=True)
return

self._keepalive_handle = self._loop.call_at(
next, self._process_keepalive)
# not all request handlers are done,
# reschedule itself to next second
self._keepalive_handle = self._loop.call_later(
self.KEEPALIVE_RESCHEDULE_DELAY, self._process_keepalive)

def pause_reading(self):
if not self._reading_paused:
Expand Down
2 changes: 2 additions & 0 deletions aiohttp/web_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ def _post_start(self, request, protocol, writer):
request._protocol, limit=2 ** 16, loop=self._loop)
request.protocol.set_parser(WebSocketReader(
self._reader, compress=self._compress))
# disable HTTP keepalive for WebSocket
request.protocol.keep_alive(False)

def can_prepare(self, request):
if self._writer is not None:
Expand Down
22 changes: 16 additions & 6 deletions tests/test_web_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from aiohttp import helpers, http, streams, web


@pytest.yield_fixture
@pytest.fixture
def make_srv(loop, manager):
srv = None

Expand Down Expand Up @@ -72,12 +72,12 @@ def handle(request):
return wrapper


@pytest.yield_fixture
@pytest.fixture
def writer(srv):
return http.PayloadWriter(srv.writer, srv._loop)


@pytest.yield_fixture
@pytest.fixture
def transport(buf):
transport = mock.Mock()

Expand Down Expand Up @@ -204,7 +204,7 @@ def test_connection_made(make_srv):
assert not srv._force_close


def test_connection_made_with_keepaplive(make_srv, transport):
def test_connection_made_with_tcp_keepaplive(make_srv, transport):
srv = make_srv()

sock = mock.Mock()
Expand All @@ -214,7 +214,7 @@ def test_connection_made_with_keepaplive(make_srv, transport):
socket.SO_KEEPALIVE, 1)


def test_connection_made_without_keepaplive(make_srv):
def test_connection_made_without_tcp_keepaplive(make_srv):
srv = make_srv(tcp_keepalive=False)

sock = mock.Mock()
Expand Down Expand Up @@ -260,6 +260,15 @@ def test_srv_keep_alive(srv):
assert not srv._keepalive


def test_srv_keep_alive_disable(srv):
handle = srv._keepalive_handle = mock.Mock()

srv.keep_alive(False)
assert not srv._keepalive
assert srv._keepalive_handle is None
handle.cancel.assert_called_with()


def test_slow_request(make_srv):
with pytest.warns(DeprecationWarning):
make_srv(slow_request_timeout=0.01)
Expand Down Expand Up @@ -583,6 +592,7 @@ def test_handle_500(srv, loop, buf, transport, request_handler):
@asyncio.coroutine
def test_keep_alive(make_srv, loop, transport, ceil):
srv = make_srv(keepalive_timeout=0.05)
srv.KEEPALIVE_RESCHEDULE_DELAY = 0.1
srv.connection_made(transport)

srv.keep_alive(True)
Expand All @@ -600,7 +610,7 @@ def test_keep_alive(make_srv, loop, transport, ceil):
assert srv._keepalive_handle is not None
assert not transport.close.called

yield from asyncio.sleep(0.1, loop=loop)
yield from asyncio.sleep(0.2, loop=loop)
assert transport.close.called
assert srv._waiters[0].cancelled

Expand Down
29 changes: 29 additions & 0 deletions tests/test_web_websocket_functional.py
Original file line number Diff line number Diff line change
Expand Up @@ -758,3 +758,32 @@ def handler(request):
yield from ws.receive()

assert cancelled


@asyncio.coroutine
def test_websocket_disable_keepalive(loop, test_client):
@asyncio.coroutine
def handler(request):
ws = web.WebSocketResponse()
if not ws.can_prepare(request):
return web.Response(text='OK')
assert request.protocol._keepalive
yield from ws.prepare(request)
assert not request.protocol._keepalive
assert not request.protocol._keepalive_handle

yield from ws.send_str('OK')
yield from ws.close()
return ws

app = web.Application()
app.router.add_route('GET', '/', handler)
client = yield from test_client(app)

resp = yield from client.get('/')
txt = yield from resp.text()
assert txt == 'OK'

ws = yield from client.ws_connect('/')
data = yield from ws.receive_str()
assert data == 'OK'

0 comments on commit ad2366e

Please sign in to comment.