Skip to content

Commit

Permalink
Merge pull request #15 from Yipit/bugfix/tcp-reads
Browse files Browse the repository at this point in the history
Improve TCP communication
  • Loading branch information
hltbra authored Feb 1, 2019
2 parents a36a232 + 4d49bd3 commit ce0d0f8
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 31 deletions.
42 changes: 21 additions & 21 deletions dredis/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,53 +5,53 @@ class Parser(object):

def __init__(self, read_fn):
self._buffer = ""
self._buffer_pos = 0
self._read_fn = read_fn

def _readline(self):
if '\n' not in self._buffer:
self._read_into_buffer()
crlf_position = self._buffer.find(self.CRLF)
result = self._buffer[:crlf_position]
self._buffer = self._buffer[crlf_position + len(self.CRLF):]
if self.CRLF not in self._buffer[self._buffer_pos:]:
raise StopIteration()
crlf_position = self._buffer[self._buffer_pos:].find(self.CRLF)
result = self._buffer[self._buffer_pos:][:crlf_position]
self._buffer_pos += crlf_position + len(self.CRLF)
return result

def _read_into_buffer(self, min_bytes=0):
def _read_into_buffer(self):
# FIXME: implement a maximum size for the buffer to prevent a crash due to bad clients
data = self._read_fn(self.MAX_BUFSIZE)
self._buffer += data
while data and len(self._buffer) < min_bytes:
data = self._read_fn(self.MAX_BUFSIZE)
self._buffer += data

def _read(self, n_bytes):
if len(self._buffer) < n_bytes:
self._read_into_buffer(min_bytes=n_bytes)
result = self._buffer[:n_bytes]
self._buffer = self._buffer[n_bytes + 2:]
if len(self._buffer[self._buffer_pos:]) < n_bytes:
raise StopIteration()
result = self._buffer[self._buffer_pos:][:n_bytes]
# FIXME: ensure self.CRLF is next
self._buffer_pos += n_bytes + len(self.CRLF)
return result

def get_instructions(self):
if not self._buffer:
self._read_into_buffer()
self._read_into_buffer()
while self._buffer:
self._buffer_pos = 0
instructions = self._readline()
if not instructions:
raise StopIteration()

# the Redis protocol says that all commands are arrays, however,
# Redis's own tests have commands like PING being sent as a Simple String
if instructions.startswith('+'):
self._buffer = self._buffer[self._buffer_pos:]
yield instructions[1:].strip().split()
# if instructions.startswith('*'):
elif instructions.startswith('*'):
# array of instructions
array_length = int(instructions[1:]) # skip '*' char
instruction_set = []
for _ in range(array_length):
str_len = int(self._readline()[1:]) # skip '$' char
line = self._readline()
str_len = int(line[1:]) # skip '$' char
instruction = self._read(str_len)
instruction_set.append(instruction)
self._buffer = self._buffer[self._buffer_pos:]
yield instruction_set
else:
# inline instructions, saw them in the Redis tests
for line in instructions.split('\r\n'):
for line in instructions.split(self.CRLF):
self._buffer = self._buffer[self._buffer_pos:]
yield line.strip().split()
19 changes: 15 additions & 4 deletions dredis/server.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import argparse
import asyncore
import errno
import json
import logging
import os.path
Expand Down Expand Up @@ -75,11 +76,21 @@ def _transform(elem):

class CommandHandler(asyncore.dispatcher):

def __init__(self, *args, **kwargs):
asyncore.dispatcher.__init__(self, *args, **kwargs)
self._parser = Parser(self.recv) # contains client message buffer

def handle_read(self):
parser = Parser(self.recv)
for cmd in parser.get_instructions():
logger.debug('{} data = {}'.format(self.addr, repr(cmd)))
execute_cmd(self.keyspace, self.debug_send, *cmd)
try:
for cmd in self._parser.get_instructions():
logger.debug('{} data = {}'.format(self.addr, repr(cmd)))
execute_cmd(self.keyspace, self.debug_send, *cmd)
except socket.error as exc:
# try again later if no data is available
if exc.errno == errno.EAGAIN:
return
else:
raise

def debug_send(self, *args):
logger.debug("out={}".format(repr(args)))
Expand Down
33 changes: 27 additions & 6 deletions tests/unit/test_redis_protocol.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import pytest

from dredis.parser import Parser


Expand Down Expand Up @@ -51,14 +53,33 @@ def read(n):
assert list(p.get_instructions()) == [['PING'], ['PING']]


def test_parser_should_request_more_data_if_needed():
responses = [
"*1\r\n$4\r\n",
"PING\r\n"
]
@pytest.mark.parametrize("line", ("*", "*1", "*1\r\n$4\r\n"))
def test_parser_should_ignore_half_sent_commands(line):
def read(bufsize):
return line

p = Parser(read)
assert list(p.get_instructions()) == []


def test_parser_should_work_with_chunks_sent_separately():
responses = ["*1"]

def read(bufsize):
return responses.pop(0)

p = Parser(read)
assert list(p.get_instructions()) == [['PING']]

with pytest.raises(StopIteration):
next(p.get_instructions())

responses.append("\r\n$4\r")
with pytest.raises(StopIteration):
next(p.get_instructions())

responses.append("\nPIN")
with pytest.raises(StopIteration):
next(p.get_instructions())

responses.append("G\r\n")
assert next(p.get_instructions()) == ['PING']

0 comments on commit ce0d0f8

Please sign in to comment.