Skip to content

Commit

Permalink
reader: Don't decrement total_rdy on message receipt. Adjust RDY
Browse files Browse the repository at this point in the history
redistribution logic accordingly.

This brings reader behavior into agreement with nsqd behavior (compare nsqio/nsq#404)
and removes an opportunity for max_in_flight violations (#177).
  • Loading branch information
Alp Aker committed May 10, 2017
1 parent 278f148 commit fa348cf
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 28 deletions.
1 change: 0 additions & 1 deletion nsq/async.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,6 @@ def _on_data(self, data, **kwargs):
frame, data = protocol.unpack_response(data)
if frame == protocol.FRAME_TYPE_MESSAGE:
self.last_msg_timestamp = time.time()
self.rdy = max(self.rdy - 1, 0)
self.in_flight += 1

message = protocol.decode_message(data)
Expand Down
44 changes: 18 additions & 26 deletions nsq/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,21 +317,7 @@ def _on_message(self, conn, message, **kwargs):
logger.exception('[%s:%s] failed to handle_message() %r', conn.id, self.name, message)

def _handle_message(self, conn, message):
self.total_rdy = max(self.total_rdy - 1, 0)

rdy_conn = conn
if len(self.conns) > self.max_in_flight and time.time() - self.random_rdy_ts > 30:
# if all connections aren't getting RDY
# occsionally randomize which connection gets RDY
self.random_rdy_ts = time.time()
conns_with_no_rdy = [c for c in itervalues(self.conns) if not c.rdy]
if conns_with_no_rdy:
rdy_conn = random.choice(conns_with_no_rdy)
if rdy_conn is not conn:
logger.info('[%s:%s] redistributing RDY to %s',
conn.id, self.name, rdy_conn.id)

self._maybe_update_rdy(rdy_conn)
self._maybe_update_rdy(conn)

success = False
try:
Expand All @@ -358,7 +344,7 @@ def _maybe_update_rdy(self, conn):
if self.backoff_timer.get_interval() or self.max_in_flight == 0:
return

if conn.rdy <= 1 or conn.rdy < int(conn.last_rdy * 0.25):
if conn.rdy == 1:
self._send_rdy(conn, self._connection_max_in_flight())

def _finish_backoff_block(self):
Expand Down Expand Up @@ -452,15 +438,10 @@ def _send_rdy(self, conn, value):
if value > conn.max_rdy_count:
value = conn.max_rdy_count

if (self.total_rdy + value) > self.max_in_flight:
if not conn.rdy:
# if we're going from RDY 0 to non-0 and we couldn't because
# of the configured max in flight, try again
rdy_retry_callback = functools.partial(self._rdy_retry, conn, value)
conn.rdy_timeout = self.io_loop.add_timeout(time.time() + 5, rdy_retry_callback)
new_rdy = max(self.total_rdy - conn.rdy + value, 0)
if new_rdy > self.max_in_flight:
return

new_rdy = max(self.total_rdy - conn.rdy + value, 0)
if conn.send_rdy(value):
self.total_rdy = new_rdy

Expand Down Expand Up @@ -665,10 +646,21 @@ def _redistribute_rdy_state(self):
logger.info('[%s:%s] idle connection, giving up RDY count', conn.id, self.name)
self._send_rdy(conn, 0)

conns = self.conns.values()

in_flight = [c for c in conns if c.in_flight]
if backoff_interval:
max_in_flight = 1 - self.total_rdy
max_in_flight = max(0, 1 - len(in_flight))
else:
max_in_flight = self.max_in_flight - self.total_rdy
max_in_flight = self.max_in_flight - len(in_flight)

# if moving any connections from RDY 0 to non-0 would violate in-flight constraints,
# set RDY 0 on some connection with msgs in flight so that a later redistribution
# round can proceed and we don't stay pinned to the same connections
if not max_in_flight:
c = random.choice(in_flight)
logger.info('[%s:%s] too many msgs in flight, giving up RDY count', c.id, self.name)
self._send_rdy(c, 0)

# randomly walk the list of possible connections and send RDY 1 (up to our
# calculated "max_in_flight"). We only need to send RDY 1 because in both
Expand All @@ -677,7 +669,7 @@ def _redistribute_rdy_state(self):
# We also don't attempt to avoid the connections who previously might have had RDY 1
# because it would be overly complicated and not actually worth it (ie. given enough
# redistribution rounds it doesn't matter).
possible_conns = list(self.conns.values())
possible_conns = [c for c in conns if not (c.in_flight or c.rdy)]
while possible_conns and max_in_flight:
max_in_flight -= 1
conn = possible_conns.pop(random.randrange(len(possible_conns)))
Expand Down
1 change: 1 addition & 0 deletions tests/test_backoff.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def _get_conn(reader):

def _send_message(conn):
msg = _get_message(conn)
conn.in_flight += 1
conn.trigger(event.MESSAGE, conn=conn, message=msg)
return msg

Expand Down
2 changes: 1 addition & 1 deletion tests/test_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def test_conn_messages(self):

def _on_message(*args, **kwargs):
self.msg_count += 1
if c.rdy == 0:
if c.in_flight == 5:
self.stop()

def _on_ready(*args, **kwargs):
Expand Down

0 comments on commit fa348cf

Please sign in to comment.