From 3a4e5c844675dc61bc7390988a39e4f452dc2931 Mon Sep 17 00:00:00 2001 From: Hugo Lopes Tavares Date: Thu, 31 Jan 2019 16:38:24 -0500 Subject: [PATCH 1/2] Fix TCP reads to ignore half-baked messages A chunk is read from the network and added to a buffer (recv()), then if the buffer doesn't have enough information, the handler waits for the next `handle_read` call to get more information into the buffer. This works with simultaneous clients. The Redis codebase was the guide for this commit. --- dredis/parser.py | 42 +++++++++++++++---------------- dredis/server.py | 7 ++++-- tests/unit/test_redis_protocol.py | 33 +++++++++++++++++++----- 3 files changed, 53 insertions(+), 29 deletions(-) diff --git a/dredis/parser.py b/dredis/parser.py index 673f116..8367fe4 100644 --- a/dredis/parser.py +++ b/dredis/parser.py @@ -5,53 +5,53 @@ class Parser(object): def __init__(self, read_fn): self._buffer = "" + self._buffer_pos = 0 self._read_fn = read_fn def _readline(self): - if '\n' not in self._buffer: - self._read_into_buffer() - crlf_position = self._buffer.find(self.CRLF) - result = self._buffer[:crlf_position] - self._buffer = self._buffer[crlf_position + len(self.CRLF):] + if self.CRLF not in self._buffer[self._buffer_pos:]: + raise StopIteration() + crlf_position = self._buffer[self._buffer_pos:].find(self.CRLF) + result = self._buffer[self._buffer_pos:][:crlf_position] + self._buffer_pos += crlf_position + len(self.CRLF) return result - def _read_into_buffer(self, min_bytes=0): + def _read_into_buffer(self): + # FIXME: implement a maximum size for the buffer to prevent a crash due to bad clients data = self._read_fn(self.MAX_BUFSIZE) self._buffer += data - while data and len(self._buffer) < min_bytes: - data = self._read_fn(self.MAX_BUFSIZE) - self._buffer += data def _read(self, n_bytes): - if len(self._buffer) < n_bytes: - self._read_into_buffer(min_bytes=n_bytes) - result = self._buffer[:n_bytes] - self._buffer = self._buffer[n_bytes + 2:] + if len(self._buffer[self._buffer_pos:]) < n_bytes: + raise StopIteration() + result = self._buffer[self._buffer_pos:][:n_bytes] + # FIXME: ensure self.CRLF is next + self._buffer_pos += n_bytes + len(self.CRLF) return result def get_instructions(self): - if not self._buffer: - self._read_into_buffer() + self._read_into_buffer() while self._buffer: + self._buffer_pos = 0 instructions = self._readline() - if not instructions: - raise StopIteration() - # the Redis protocol says that all commands are arrays, however, # Redis's own tests have commands like PING being sent as a Simple String if instructions.startswith('+'): + self._buffer = self._buffer[self._buffer_pos:] yield instructions[1:].strip().split() - # if instructions.startswith('*'): elif instructions.startswith('*'): # array of instructions array_length = int(instructions[1:]) # skip '*' char instruction_set = [] for _ in range(array_length): - str_len = int(self._readline()[1:]) # skip '$' char + line = self._readline() + str_len = int(line[1:]) # skip '$' char instruction = self._read(str_len) instruction_set.append(instruction) + self._buffer = self._buffer[self._buffer_pos:] yield instruction_set else: # inline instructions, saw them in the Redis tests - for line in instructions.split('\r\n'): + for line in instructions.split(self.CRLF): + self._buffer = self._buffer[self._buffer_pos:] yield line.strip().split() diff --git a/dredis/server.py b/dredis/server.py index 9957ada..0df0bc9 100755 --- a/dredis/server.py +++ b/dredis/server.py @@ -75,9 +75,12 @@ def _transform(elem): class CommandHandler(asyncore.dispatcher): + def __init__(self, *args, **kwargs): + asyncore.dispatcher.__init__(self, *args, **kwargs) + self._parser = Parser(self.recv) # contains client message buffer + def handle_read(self): - parser = Parser(self.recv) - for cmd in parser.get_instructions(): + for cmd in self._parser.get_instructions(): logger.debug('{} data = {}'.format(self.addr, repr(cmd))) execute_cmd(self.keyspace, self.debug_send, *cmd) diff --git a/tests/unit/test_redis_protocol.py b/tests/unit/test_redis_protocol.py index f52d0f8..914dccb 100644 --- a/tests/unit/test_redis_protocol.py +++ b/tests/unit/test_redis_protocol.py @@ -1,3 +1,5 @@ +import pytest + from dredis.parser import Parser @@ -51,14 +53,33 @@ def read(n): assert list(p.get_instructions()) == [['PING'], ['PING']] -def test_parser_should_request_more_data_if_needed(): - responses = [ - "*1\r\n$4\r\n", - "PING\r\n" - ] +@pytest.mark.parametrize("line", ("*", "*1", "*1\r\n$4\r\n")) +def test_parser_should_ignore_half_sent_commands(line): + def read(bufsize): + return line + + p = Parser(read) + assert list(p.get_instructions()) == [] + + +def test_parser_should_work_with_chunks_sent_separately(): + responses = ["*1"] def read(bufsize): return responses.pop(0) p = Parser(read) - assert list(p.get_instructions()) == [['PING']] + + with pytest.raises(StopIteration): + next(p.get_instructions()) + + responses.append("\r\n$4\r") + with pytest.raises(StopIteration): + next(p.get_instructions()) + + responses.append("\nPIN") + with pytest.raises(StopIteration): + next(p.get_instructions()) + + responses.append("G\r\n") + assert next(p.get_instructions()) == ['PING'] From 4d49bd3a9422b067a390bc73403e18f92dd50cc5 Mon Sep 17 00:00:00 2001 From: Hugo Lopes Tavares Date: Thu, 31 Jan 2019 16:55:55 -0500 Subject: [PATCH 2/2] Ignore errno.EAGAIN exceptions Sometimes with non-blocking sockets they can receive EAGAIN, this is for extra safety and to avoid unnecessary crashes --- dredis/server.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/dredis/server.py b/dredis/server.py index 0df0bc9..0022027 100755 --- a/dredis/server.py +++ b/dredis/server.py @@ -1,5 +1,6 @@ import argparse import asyncore +import errno import json import logging import os.path @@ -80,9 +81,16 @@ def __init__(self, *args, **kwargs): self._parser = Parser(self.recv) # contains client message buffer def handle_read(self): - for cmd in self._parser.get_instructions(): - logger.debug('{} data = {}'.format(self.addr, repr(cmd))) - execute_cmd(self.keyspace, self.debug_send, *cmd) + try: + for cmd in self._parser.get_instructions(): + logger.debug('{} data = {}'.format(self.addr, repr(cmd))) + execute_cmd(self.keyspace, self.debug_send, *cmd) + except socket.error as exc: + # try again later if no data is available + if exc.errno == errno.EAGAIN: + return + else: + raise def debug_send(self, *args): logger.debug("out={}".format(repr(args)))