Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WebSocketResponse does not throw/close when connection is abruptly cut #2309

Closed
frederikaalund opened this issue Oct 10, 2017 · 16 comments · Fixed by #8685
Closed

WebSocketResponse does not throw/close when connection is abruptly cut #2309

frederikaalund opened this issue Oct 10, 2017 · 16 comments · Fixed by #8685
Milestone

Comments

@frederikaalund
Copy link
Contributor

Long story short

This is a continuation of #961 (with some ties to #2283). Apparently, the issue is still valid (I don't know if it was ever really fixed).

I have a single websocket handler. It does two things on connection:

  • Starts a sender task that sends N bytes over the websocket every second
  • Receives (and discards) any incoming websocket messages

I connect to this handler through a simple javascript client running in Google Chrome. Everything works as expected: I receive N bytes every second, indefinitely.

However, when the connection is abruptly cut (by disconnecting the network cable), the websocket sender task takes 15 minutes to stop (even though the heartbeat timeout is only 5 seconds).

Expected behaviour

I would expect the send_bytes operation to throw or the async for msg in ws loop to terminate/throw.

Actual behaviour

The send_bytes keep buffering up writes (eventually causing the process to run out of memory and crash). Furthermore, the async for msg in ws loop takes 15 minutes to terminate/throw.

Steps to reproduce

The websocket server:

#!/usr/bin/env python3.6
import aiohttp
import asyncio
import logging
from aiohttp import web

logger = logging.getLogger()
logger.setLevel(logging.INFO)

async def sender(ws):
	try:
		logging.info('sender begin')
		while True:
			# Sender never stops on disconnect for N = 10000
			N = 100000
			# Sender stops on disconnect for N = 10
			#N = 10
			logging.info(f'sending {N} bytes')
			try:
				ws.send_bytes(bytes(N))

				# Explicit drain will stall the sender on disconnect
				# but not throw (despite of the heartbeat)
				#await ws.drain()

				# The only fix is to query transport.is_closing()
				# This stops the transport as soon as the heartbeat times out
				#if ws._req.transport.is_closing():
				#	logging.info(f'sender transport is closing')
				#	break
			except Exception as error:
				logging.info(f'sender error:', exc_info=error)
			await asyncio.sleep(1.0)
	# Catch all sender Exceptions
	except Exception as error:
		logging.info('sender error:', type(error))
	finally:
		logging.info('sender end')

async def handler(request):
	# Note the hearbeat is set to 5 seconds
	ws = web.WebSocketResponse(heartbeat=5.0)
	await ws.prepare(request)

	# Launch send task
	sender_task = asyncio.ensure_future(sender(ws))

	# Pass any message
	logging.info('receiver begin')
	try:
		async for msg in ws:
			pass
	# Catch all receiver Exceptions
	except Exception as error:
		logging.info(f'receiver error:', exc_info=error)
	finally:
		logging.info('receiver end')

	# Cancel sender task
	sender_task.cancel()

	return ws

app = web.Application()
app.router.add_route('GET', '/', handler)
web.run_app(app, port=5006)

The javascript client (wrapped in HTML for browser use):

<html>
    <script>
        ws = new WebSocket('ws://192.168.1.111:5006/');
        ws.onmessage = (message) => console.log(message);
        ws.onerror = (error) => console.error(error);
        ws.onclose = () => console.log('closed');
    </script>
</html>

Note that this only happens for large N (around 10000). Everything works fine for say N = 10. I don't know why. Maybe some internal caching/buffering is causing this issue?

Note that using await ws.drain() only stalls the sender. This way, the process doesn't run out of memory but the connection is still in limbo.

The practical fix is to query ws._req.transport.is_closing() (as pointed out by ttm56p in #2283).

I think that aiohttp should handle this internally by whatever means necessary.

It took a long time for me to track down a memory leak to this issue. The writes just kept buffering up. Note that this also happens if you simply let a browser tab sit idle for a long time (apparently, Chrome will stop sending websocket data after a period of inactivity in the tab). In practice, I saw my websocket servers crashing every now and then.

Your environment

Aiohttp 2.2.3 and Python 3.6.1. Google Chrome 61.0.

@s-kostyuk
Copy link

At least it logs errors in DEBUG mode (actually, two errors, and one of the with a dot):

@kerma
Copy link
Contributor

kerma commented Sep 5, 2018

No progress on this? Is there a workaround one could use? The current situation makes websockets unusable in any cloud environments where connection cuts are pretty common thing. I see this happening quite often in AWS.

@asvetlov
Copy link
Member

asvetlov commented Sep 5, 2018

Workaround (the proper solution actually) is sending ping messages from a client.
Unfortunately, browsers don't support WebSocket pings, you have to emulate them by small regular messages.

@s-kostyuk
Copy link

s-kostyuk commented Sep 5, 2018 via email

@asvetlov
Copy link
Member

asvetlov commented Sep 5, 2018

Sure.

  1. There is no guarantee for instant disconnection detection. Depending on the network the event may come two hours after closing a socket by a peer.
  2. aiohttp detects disconnections and handles them by canceling web-handler task. Web-handler can catch asyncio.CancelledError for processing this situation.

@frederikaalund
Copy link
Contributor Author

There is no guarantee for instant disconnection detection. Depending on the network the event may come two hours after closing a socket by a peer.

That is understandable. What seems strange to me is that apparently aiohttp can detect that something is wrong since ws._req.transport.is_closing() returns true after an abrupt disconnect. Can't we use that information to raise CancelledError or the like?

@louisabraham
Copy link

louisabraham commented May 17, 2020

Hi, is there currently another way to reliably detect disconnections than doing a manual ping?

I observe some messages "socket.send() raised exception." in my console when sending to closed websocket. Is there a way to catch them?
Also, if send_str is able to know the connection is closed, why doesn't receive() raise an exception?

I just don't understand why TCP sockets detect disconnection and not websockets in this implementation with the heartbeat parameter.

@louisabraham
Copy link

I think I isolated instances of such connections:

Traceback (most recent call last):
  Code from my application:
    msg = await ws.receive(timeout)
  File "/home/pi/.local/lib/python3.7/site-packages/aiohttp/web_ws.py", line 384, in receive
    msg = await self._reader.read()
  File "/home/pi/.local/lib/python3.7/site-packages/aiohttp/streams.py", line 631, in read
    return await super().read()
  File "/home/pi/.local/lib/python3.7/site-packages/aiohttp/streams.py", line 591, in read
    await self._waiter
concurrent.futures._base.CancelledError

I was only catching asyncio.TimeoutError but it looks like other errors can happen as well.

@czepiec
Copy link

czepiec commented May 12, 2021

I have exactly same issue with WebSocket client. When I drop connection (by the firewall or simply turn off WiFi) while awaiting method receive() of instance ClientWebSocketResponse (and nothing is to be received) the method is awaited (returned) after 15 minutes. Heartbeat is enabled.

It can be reproduced with this code:

from asyncio import run
from aiohttp import ClientSession

async def main():
    async with ClientSession() as session:
        async with session.ws_connect('https://echo.websocket.org/',
                                      heartbeat=10) as ws:
            print('Connected', ws)
            print('Received', await ws.receive())

run(main())

ws.receive() returns CLOSED after 15 minutes (15:30 to 16:10 in tests) after the connection is lost. async for msg in ws: is the same, the generator ends after 15 minutes. I expect maximum time as the sum of heartbeat and timeout (or am I wrong?).
But it is interesting that when I look at ws.closed asynchronously, it is correctly set to True after 10-20 seconds.

My environment is Python 3.8.3 and aiohttp 3.7.4.post0.

@schlamar
Copy link

The state machine with websockets is somehow fundamentally broken in aiohttp. I don't get it why this is ignored for almost 4 years now. It's the reason I ultimately switched from aiohttp and can't recommend it anymore.

As alternative I can recommend Sanic or Starlette.

@czepiec
Copy link

czepiec commented Jul 5, 2021

I have exactly same issue with WebSocket client. When I drop connection (by the firewall or simply turn off WiFi) while awaiting method receive() of instance ClientWebSocketResponse (and nothing is to be received) the method is awaited (returned) after 15 minutes. Heartbeat is enabled.

I found that the workaround is to create a client with a connector with the parameter enable_cleanup_closed=True. In that case, the ws.receive() returns CLOSED in a short time.

From docs:

enable_cleanup_closed (bool) – some SSL servers do not properly complete SSL shutdown process, in that case asyncio leaks ssl connections. If this parameter is set to True, aiohttp additionally aborts underlining transport after 2 seconds. It is off by default.

But the problem I mentioned has nothing to do with SSL servers, it happens when the connection is dropped. So this is just a workaround.

@Hanaasagi
Copy link
Member

The connection will not be dropped when you turn off WIFI. TCP establishes a logical circuit between client and server. Even if the physical link layer is cut, the TCP connection is still existed. This could be verified by this command lsof -p 62142 -nP | grep TCP. I’m curious why it’s 15 minutes, and trying to find out what happened.

@Hanaasagi
Copy link
Member

Hanaasagi commented Jul 6, 2021

receive method has a timeout argument, and the default value is None. It means no timeout for reading from a connection. The websocket pong frame and user data frame uses the same timeout. So if aiohttp send a ping frame successfully, then we turn off the network. The receive coroutine will be block at the msg = await self._reader.read(). Because there are no data comes from the server.

async def receive(self, timeout: Optional[float] = None) -> WSMessage:
while True:
if self._waiting is not None:
raise RuntimeError("Concurrent call to receive() is not allowed")
if self._closed:
return WS_CLOSED_MESSAGE
elif self._closing:
await self.close()
return WS_CLOSED_MESSAGE
try:
self._waiting = self._loop.create_future()
try:
with async_timeout.timeout(
timeout or self._receive_timeout, loop=self._loop
):
msg = await self._reader.read()
self._reset_heartbeat()
finally:
waiter = self._waiting
self._waiting = None
set_result(waiter, True)
except (asyncio.CancelledError, asyncio.TimeoutError):
self._close_code = 1006
raise

Even if we call _pong_not_received, it doesn't cancel the receive coroutine.

def _pong_not_received(self) -> None:
if not self._closed:
self._closed = True
self._close_code = 1006
self._exception = asyncio.TimeoutError()
self._response.close()

The receive coroutine finished until a EofStream error from the TCP Stack.

@Hanaasagi
Copy link
Member

A straightforward solution is using a exception to awake the receive coroutine. Like following:
图片

But, in my option. heartbeat use be a independent task. There should be a internal _receive coroutine that read the data from connection and dispatch to the receive coroutine for user and heartbeat coroutine for contorling.

A heartbeat works like

self._receive_pong = False
while True:
    # send ping to connection
    await sleep(10)
    # check self._receive_pong
    self._receive_pong = False

A internal _receive:

async with async_timeout.timeout( 
     timeout or self._timeout.ws_receive 
): 
    msg = await self._reader.read()
if msg.type == WSMsgType.PONG and self._autoping:
    # set the _receive_pong = True
else:
    # feed data to `receive` coroutine

Public receive() for user.

async def receive():
    # wait the data feed by `_receive` or raise a exception set by heartbeat.

Hanaasagi added a commit to Hanaasagi/aiohttp that referenced this issue Jul 8, 2021
…if network is suddenly interrupted.

Related to aio-libs#2309; If we call `receive()` with a unlimited timeout,
it will get stuck for long time, because `_pong_not_received` has no way
to awake a block coroutine `receive()`. This PR add a `asyncio.Event` to
awake this coroutine.
@wiryonolau
Copy link

Hi my problem is happen when aiohttp run in ssl mode. It doesn't detect client abrupt disconnect , Does it happen in your case @frederikaalund ?

Latent-Logic added a commit to Latent-Logic/pyTwitchAPI that referenced this issue Oct 15, 2023
It turns out that aiohttp WebSocket code does not recognize if the
underlying connection is broken:
aio-libs/aiohttp#2309
As a twitch client we know we'll at minimum receive a PING request every
5 minutes even if no other channel updates are happening:
https://dev.twitch.tv/docs/irc/#keepalive-messages
(I tested this and saw every a ping request always happened between
every 4min 1sec to 4min 51sec)

If we miss a PING and fail to respond twitch considers the connection
closed. Thus we can add a timeout to our websocket receive() call and if
we've heard nothing in over 5 minutes our connection is broken and we
should perform a reconnect. This has been tested running the bot, and
then trying 3 different types of connection breaking:
1) Disabling the network interface on my computer for 5 minutes
2) Suspending laptop and resuming later
3) Dropping related active NAT state on router (similar to rebooting router)
@bdraco
Copy link
Member

bdraco commented Aug 17, 2024

There are a few cases here. We can fix the connection not closing and the exception not being set when the heartbeat times out.

If the heartbeat is disabled and the connection is dropped between hosts without any traffic asyncio wont know until the next send fails since receive has no way of knowing the connection has gone away unless there is a RST

bdraco pushed a commit that referenced this issue Aug 17, 2024
… task not being consumed (#8729)

Co-authored-by: J. Nick Koston <[email protected]>
closes #7238
fixes #5182
fixes #4153
fixes #2309
bdraco pushed a commit that referenced this issue Aug 17, 2024
… task not being consumed (#8730)

Co-authored-by: J. Nick Koston <[email protected]>
closes #7238
fixes #5182
fixes #4153
fixes #2309
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
10 participants