Skip to content

Commit

Permalink
expose function to dynamically chagne max_in_flight count
Browse files Browse the repository at this point in the history
  • Loading branch information
jehiah committed May 12, 2015
1 parent b415cf0 commit 47f3a2e
Showing 1 changed file with 22 additions and 5 deletions.
27 changes: 22 additions & 5 deletions nsq/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ def _handle_message(self, conn, message):
return message.requeue()

def _maybe_update_rdy(self, conn):
if self.backoff_timer.get_interval():
if self.backoff_timer.get_interval() or self.max_in_flight == 0:
return

if conn.rdy <= 1 or conn.rdy < int(conn.last_rdy * 0.25):
Expand All @@ -366,7 +366,7 @@ def _finish_backoff_block(self):

# test the waters after finishing a backoff round
# if we have no connections, this will happen when a new connection gets RDY 1
if not self.conns:
if not self.conns or self.max_in_flight == 0:
return

conn = random.choice(self.conns.values())
Expand Down Expand Up @@ -436,7 +436,7 @@ def _send_rdy(self, conn, value):
self.io_loop.remove_timeout(conn.rdy_timeout)
conn.rdy_timeout = None

if value and self.disabled():
if value and (self.disabled() or self.max_in_flight == 0):
logger.info('[%s:%s] disabled, delaying RDY state change', conn.id, self.name)
rdy_retry_callback = functools.partial(self._rdy_retry, conn, value)
conn.rdy_timeout = self.io_loop.add_timeout(time.time() + 15, rdy_retry_callback)
Expand Down Expand Up @@ -603,7 +603,24 @@ def _finish_query_lookupd(self, response, lookupd_url):
address = producer.get('broadcast_address', producer.get('address'))
assert address
self.connect_to_nsqd(address, producer['tcp_port'])


def set_max_in_flight(self, max_in_flight):
"""dynamically adjust the reader max_in_flight count. Set to 0 to immediately disable a Reader"""
assert isinstance(max_in_flight, int)
self.max_in_flight = max_in_flight

if max_in_flight == 0:
# set RDY 0 to all connections
for conn in self.conns.itervalues():
if conn.rdy > 0:
logger.debug('[%s:%s] rdy: %d -> 0', conn.id, self.name, conn.rdy)
self._send_rdy(conn, 0)
self.total_rdy = 0
else:
self.need_rdy_redistributed = True
self._redistribute_rdy_state()


def _redistribute_rdy_state(self):
# We redistribute RDY counts in a few cases:
#
Expand All @@ -618,7 +635,7 @@ def _redistribute_rdy_state(self):
if not self.conns:
return

if self.disabled() or self.backoff_block:
if self.disabled() or self.backoff_block or self.max_in_flight == 0:
return

if len(self.conns) > self.max_in_flight:
Expand Down

0 comments on commit 47f3a2e

Please sign in to comment.