diff --git a/nsq/async.py b/nsq/async.py index ab88c51..d44500d 100644 --- a/nsq/async.py +++ b/nsq/async.py @@ -486,7 +486,6 @@ def _on_data(self, data, **kwargs): frame, data = protocol.unpack_response(data) if frame == protocol.FRAME_TYPE_MESSAGE: self.last_msg_timestamp = time.time() - self.rdy = max(self.rdy - 1, 0) self.in_flight += 1 message = protocol.decode_message(data) diff --git a/nsq/reader.py b/nsq/reader.py index 6e11073..3db8c0e 100644 --- a/nsq/reader.py +++ b/nsq/reader.py @@ -317,21 +317,7 @@ def _on_message(self, conn, message, **kwargs): logger.exception('[%s:%s] failed to handle_message() %r', conn.id, self.name, message) def _handle_message(self, conn, message): - self.total_rdy = max(self.total_rdy - 1, 0) - - rdy_conn = conn - if len(self.conns) > self.max_in_flight and time.time() - self.random_rdy_ts > 30: - # if all connections aren't getting RDY - # occsionally randomize which connection gets RDY - self.random_rdy_ts = time.time() - conns_with_no_rdy = [c for c in itervalues(self.conns) if not c.rdy] - if conns_with_no_rdy: - rdy_conn = random.choice(conns_with_no_rdy) - if rdy_conn is not conn: - logger.info('[%s:%s] redistributing RDY to %s', - conn.id, self.name, rdy_conn.id) - - self._maybe_update_rdy(rdy_conn) + self._maybe_update_rdy(conn) success = False try: @@ -358,7 +344,7 @@ def _maybe_update_rdy(self, conn): if self.backoff_timer.get_interval() or self.max_in_flight == 0: return - if conn.rdy <= 1 or conn.rdy < int(conn.last_rdy * 0.25): + if conn.rdy == 1: self._send_rdy(conn, self._connection_max_in_flight()) def _finish_backoff_block(self): @@ -452,15 +438,10 @@ def _send_rdy(self, conn, value): if value > conn.max_rdy_count: value = conn.max_rdy_count - if (self.total_rdy + value) > self.max_in_flight: - if not conn.rdy: - # if we're going from RDY 0 to non-0 and we couldn't because - # of the configured max in flight, try again - rdy_retry_callback = functools.partial(self._rdy_retry, conn, value) - conn.rdy_timeout = self.io_loop.add_timeout(time.time() + 5, rdy_retry_callback) + new_rdy = max(self.total_rdy - conn.rdy + value, 0) + if new_rdy > self.max_in_flight: return - new_rdy = max(self.total_rdy - conn.rdy + value, 0) if conn.send_rdy(value): self.total_rdy = new_rdy @@ -665,10 +646,21 @@ def _redistribute_rdy_state(self): logger.info('[%s:%s] idle connection, giving up RDY count', conn.id, self.name) self._send_rdy(conn, 0) + conns = self.conns.values() + + in_flight = [c for c in conns if c.in_flight] if backoff_interval: - max_in_flight = 1 - self.total_rdy + max_in_flight = max(0, 1 - len(in_flight)) else: - max_in_flight = self.max_in_flight - self.total_rdy + max_in_flight = self.max_in_flight - len(in_flight) + + # if moving any connections from RDY 0 to non-0 would violate in-flight constraints, + # set RDY 0 on some connection with msgs in flight so that a later redistribution + # round can proceed and we don't stay pinned to the same connections + if not max_in_flight: + c = random.choice(in_flight) + logger.info('[%s:%s] too many msgs in flight, giving up RDY count', c.id, self.name) + self._send_rdy(c, 0) # randomly walk the list of possible connections and send RDY 1 (up to our # calculated "max_in_flight"). We only need to send RDY 1 because in both @@ -677,7 +669,7 @@ def _redistribute_rdy_state(self): # We also don't attempt to avoid the connections who previously might have had RDY 1 # because it would be overly complicated and not actually worth it (ie. given enough # redistribution rounds it doesn't matter). - possible_conns = list(self.conns.values()) + possible_conns = [c for c in conns if not (c.in_flight or c.rdy)] while possible_conns and max_in_flight: max_in_flight -= 1 conn = possible_conns.pop(random.randrange(len(possible_conns))) diff --git a/tests/test_backoff.py b/tests/test_backoff.py index 3db5f08..b216da7 100644 --- a/tests/test_backoff.py +++ b/tests/test_backoff.py @@ -52,6 +52,7 @@ def _get_conn(reader): def _send_message(conn): msg = _get_message(conn) + conn.in_flight += 1 conn.trigger(event.MESSAGE, conn=conn, message=msg) return msg diff --git a/tests/test_reader.py b/tests/test_reader.py index 7d7f09e..e6e46aa 100644 --- a/tests/test_reader.py +++ b/tests/test_reader.py @@ -160,7 +160,7 @@ def test_conn_messages(self): def _on_message(*args, **kwargs): self.msg_count += 1 - if c.rdy == 0: + if c.in_flight == 5: self.stop() def _on_ready(*args, **kwargs):