Skip to content

Commit

Permalink
Implemented readuntil in StreamResponse (#4734) (#5196)
Browse files Browse the repository at this point in the history
Co-authored-by: Andrew Svetlov <[email protected]>

Co-authored-by: Anas <[email protected]>
Co-authored-by: Andrew Svetlov <[email protected]>
  • Loading branch information
3 people authored Nov 2, 2020
1 parent bed8b87 commit 8ce3013
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGES/4054.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implemented readuntil in StreamResponse
29 changes: 19 additions & 10 deletions aiohttp/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,34 +310,41 @@ async def _wait(self, func_name: str) -> None:
self._waiter = None

async def readline(self) -> bytes:
return await self.readuntil()

async def readuntil(self, separator: bytes = b"\n") -> bytes:
seplen = len(separator)
if seplen == 0:
raise ValueError("Separator should be at least one-byte string")

if self._exception is not None:
raise self._exception

line = []
line_size = 0
chunk = b""
chunk_size = 0
not_enough = True

while not_enough:
while self._buffer and not_enough:
offset = self._buffer_offset
ichar = self._buffer[0].find(b"\n", offset) + 1
# Read from current offset to found b'\n' or to the end.
ichar = self._buffer[0].find(separator, offset) + 1
# Read from current offset to found separator or to the end.
data = self._read_nowait_chunk(ichar - offset if ichar else -1)
line.append(data)
line_size += len(data)
chunk += data
chunk_size += len(data)
if ichar:
not_enough = False

if line_size > self._high_water:
raise ValueError("Line is too long")
if chunk_size > self._high_water:
raise ValueError("Chunk too big")

if self._eof:
break

if not_enough:
await self._wait("readline")
await self._wait("readuntil")

return b"".join(line)
return chunk

async def read(self, n: int = -1) -> bytes:
if self._exception is not None:
Expand Down Expand Up @@ -531,6 +538,8 @@ async def readline(self) -> bytes:
async def read(self, n: int = -1) -> bytes:
return b""

# TODO add async def readuntil

async def readany(self) -> bytes:
return b""

Expand Down
13 changes: 13 additions & 0 deletions docs/streams.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,19 @@ Reading Methods

:return bytes: the given line

.. comethod:: StreamReader.readuntil(separator="\n")

Read until separator, where `separator` is a sequence of bytes.

If EOF is received, and `separator` was not found, the method will
return the partial read bytes.

If the EOF was received and the internal buffer is empty, return an
empty bytes object.

.. versionadded:: 3.8

:return bytes: the given data

.. comethod:: StreamReader.readchunk()

Expand Down
111 changes: 111 additions & 0 deletions tests/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,117 @@ async def test_readline_exception(self) -> None:
with pytest.raises(ValueError):
await stream.readline()

async def test_readuntil(self) -> None:
loop = asyncio.get_event_loop()
# Read one chunk. 'readuntil' will need to wait for the data
# to come from 'cb'
stream = self._make_one()
stream.feed_data(b"chunk1 ")
read_task = loop.create_task(stream.readuntil(b"*"))

def cb():
stream.feed_data(b"chunk2 ")
stream.feed_data(b"chunk3 ")
stream.feed_data(b"* chunk4")

loop.call_soon(cb)

line = await read_task
assert b"chunk1 chunk2 chunk3 *" == line

stream.feed_eof()
data = await stream.read()
assert b" chunk4" == data

async def test_readuntil_limit_with_existing_data(self) -> None:
# Read one chunk. The data is in StreamReader's buffer
# before the event loop is run.

stream = self._make_one(limit=2)
stream.feed_data(b"li")
stream.feed_data(b"ne1&line2&")

with pytest.raises(ValueError):
await stream.readuntil(b"&")
# The buffer should contain the remaining data after exception
stream.feed_eof()
data = await stream.read()
assert b"line2&" == data

async def test_readuntil_limit(self) -> None:
loop = asyncio.get_event_loop()
# Read one chunk. StreamReaders are fed with data after
# their 'readuntil' methods are called.
stream = self._make_one(limit=4)

def cb():
stream.feed_data(b"chunk1")
stream.feed_data(b"chunk2$")
stream.feed_data(b"chunk3#")
stream.feed_eof()

loop.call_soon(cb)

with pytest.raises(ValueError):
await stream.readuntil(b"$")
data = await stream.read()
assert b"chunk3#" == data

async def test_readuntil_nolimit_nowait(self) -> None:
# All needed data for the first 'readuntil' call will be
# in the buffer.
stream = self._make_one()
data = b"line1!line2!line3!"
stream.feed_data(data[:6])
stream.feed_data(data[6:])

line = await stream.readuntil(b"!")
assert b"line1!" == line

stream.feed_eof()
data = await stream.read()
assert b"line2!line3!" == data

async def test_readuntil_eof(self) -> None:
stream = self._make_one()
stream.feed_data(b"some data")
stream.feed_eof()

line = await stream.readuntil(b"@")
assert b"some data" == line

async def test_readuntil_empty_eof(self) -> None:
stream = self._make_one()
stream.feed_eof()

line = await stream.readuntil(b"@")
assert b"" == line

async def test_readuntil_read_byte_count(self) -> None:
stream = self._make_one()
data = b"line1!line2!line3!"
stream.feed_data(data)

await stream.readuntil(b"!")

data = await stream.read(7)
assert b"line2!l" == data

stream.feed_eof()
data = await stream.read()
assert b"ine3!" == data

async def test_readuntil_exception(self) -> None:
stream = self._make_one()
stream.feed_data(b"line#")

data = await stream.readuntil(b"#")
assert b"line#" == data

stream.set_exception(ValueError())
with pytest.raises(ValueError):
await stream.readuntil(b"#")

async def test_readexactly_zero_or_less(self) -> None:
# Read exact number of bytes (zero or less).
stream = self._make_one()
Expand Down

0 comments on commit 8ce3013

Please sign in to comment.